Skip to content

Commit 0084465

Browse files
author
Stephan Lammel
committed
add CPU usage query function based on ElasticSearch data
1 parent 2048f4a commit 0084465

1 file changed

Lines changed: 131 additions & 1 deletion

File tree

facility/adm_facility.py

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,10 @@ def admf_influxdb_jobmon(firstTIS, limitTIS, siteDict, fsssDict):
984984
# cores, the tags of interest. #
985985
# ####################################################################### #
986986
URL_INFLUXDB = "https://monit-grafana.cern.ch/api/datasources/proxy/7731/query?db=monit_production_cmsjm&q=SELECT%%20SUM%%28wavg_count%%29%%20FROM%%20%%22long%%22.%%22condor_1d%%22%%20WHERE%%20%%22Status%%22%%20=%%20%%27Running%%27%%20AND%%20time%%20%%3E=%%20%ds%%20and%%20time%%20%%3C%%20%ds%%20GROUP%%20BY%%20%%22RequestCpus%%22%%2C%%20%%22Site%%22"
987+
# -------------------------------------------------------------------------
988+
# urllib.parse.unquote(URL_INFLUXDB % (123456789, 987654321))
989+
# 'https://monit-grafana.cern.ch/api/datasources/proxy/7731/query?db=monit_production_cmsjm&q=SELECT SUM(wavg_count) FROM "long"."condor_1d" WHERE "Status" = \'Running\' AND time >= 123456789s and time < 987654321s GROUP BY "RequestCpus", "Site"'
990+
# -------------------------------------------------------------------------
987991
HDR_GRAFANA = {'Authorization': "Bearer eyJrIjoiZWRnWXc1bUZWS0kwbWExN011TGNTN2I2S1JpZFFtTWYiLCJuIjoiY21zLXNzYiIsImlkIjoxMX0=", 'Content-Type': "application/x-www-form-urlencoded; charset=UTF-8", 'Accept': "application/json"}
988992
#
989993
first15m = int( firstTIS / 86400 ) * 96
@@ -1085,6 +1089,132 @@ def admf_influxdb_jobmon(firstTIS, limitTIS, siteDict, fsssDict):
10851089

10861090

10871091

1092+
def admf_grafana_jobmon(firstTIS, limitTIS, siteDict, fsssDict):
1093+
"""sum up CPU usage from MonIT/ElasticSearch and return a site list"""
1094+
# ####################################################################### #
1095+
# fetch summed up core usage times count during firstTIS and limitTIS #
1096+
# from MonIT/ElasticSearch and return a list of sites that provided #
1097+
# 100 cores or more of CPU during that period. #
1098+
# CMS job monitoring information in InfluxDB/ElasticSearch is aggregated #
1099+
# from HTCondor 12 minute job snapshots retaining tags. We thus #
1100+
# have to aggregate over the tags that are not of interest and sum #
1101+
# the product of number-of-cores and usage for each site. #
1102+
# ####################################################################### #
1103+
URL_GRAFANA = "https://monit-grafana.cern.ch/api/datasources/proxy/9475/_msearch"
1104+
HDR_GRAFANA = {'Authorization': "Bearer eyJrIjoiZWRnWXc1bUZWS0kwbWExN011TGNTN2I2S1JpZFFtTWYiLCJuIjoiY21zLXNzYiIsImlkIjoxMX0=", 'Content-Type': "application/json; charset=UTF-8", 'Accept': "application/json"}
1105+
#
1106+
first15m = int( firstTIS / 86400 ) * 96
1107+
limit15m = int( limitTIS / 86400 ) * 96
1108+
if ( first15m >= limit15m ):
1109+
logging.critical("Empty time interval for sites to provide computing")
1110+
return []
1111+
#
1112+
logging.info("Querying ElasticSearch about job core usage via Grafana")
1113+
logging.log(15, " between %s and %s" %
1114+
(time.strftime("%Y-%m-%d", time.gmtime(first15m * 900)),
1115+
time.strftime("%Y-%m-%d", time.gmtime((limit15m * 900)-1))))
1116+
1117+
1118+
# prepare Lucene ElasticSearch query:
1119+
# ===================================
1120+
queryString = ("\"search_type\":\"query_then_fetch\",\"ignore_unavailabl" +
1121+
"e\":true,\"index\":[\"monit_prod_condor_agg_metric*\"]}" +
1122+
"\n{\"query\":{\"bool\":{\"must\":[{\"match_phrase\":{\"d" +
1123+
"ata.Status\":\"Running\"}}],\"filter\":{\"range\":{\"met" +
1124+
"adata.timestamp\":{\"gte\":%d,\"lt\":%d,\"format\":\"epo" +
1125+
"ch_second\"}}}}},\"size\":0,\"aggs\":{\"corehours_per_si" +
1126+
"te\":{\"terms\":{\"field\":\"data.Site\",\"size\":512}," +
1127+
"\"aggs\":{\"corehours_of_entry\":{\"sum\":{\"script\":{" +
1128+
"\"lang\":\"painless\",\"source\":\"doc['data.RequestCpus" +
1129+
"'].value * doc['data.wavg_count'].value\"}}}}}}}\n") % \
1130+
(first15m * 900, limit15m * 900)
1131+
1132+
1133+
# execute query and receive results from ElasticSearch:
1134+
# =====================================================
1135+
try:
1136+
requestObj = urllib.request.Request(URL_GRAFANA,
1137+
data=queryString.encode("utf-8"),
1138+
headers=HDR_GRAFANA, method="POST")
1139+
with urllib.request.urlopen( requestObj, timeout=600 ) as responseObj:
1140+
urlCharset = responseObj.headers.get_content_charset()
1141+
if urlCharset is None:
1142+
urlCharset = "utf-8"
1143+
myData = responseObj.read().decode( urlCharset )
1144+
del urlCharset
1145+
#
1146+
# sanity check:
1147+
if ( len(myData) < 1024 ):
1148+
raise ValueError("Job core usage data failed sanity check")
1149+
#
1150+
# decode JSON:
1151+
myJson = json.loads( myData )
1152+
del myData
1153+
#
1154+
except urllib.error.URLError as excptn:
1155+
logging.error("Failed to query ElasticSearch via Grafana, %s" %
1156+
str(excptn))
1157+
return []
1158+
1159+
1160+
# loop over results and integrate core usage by site:
1161+
# ===================================================
1162+
integrationDict = {}
1163+
for myRspns in myJson['responses']:
1164+
for myBuckt in myRspns['aggregations']['corehours_per_site']['buckets']:
1165+
try:
1166+
mySite = myBuckt['key']
1167+
try:
1168+
myFacility = siteDict[ mySite ]
1169+
except KeyError:
1170+
continue
1171+
myFsss = myFacility + "___" + mySite
1172+
if ( myFsss not in fsssDict ):
1173+
myFsss = myFacility
1174+
if ( myFsss not in fsssDict ):
1175+
continue
1176+
myUsage = myBuckt['corehours_of_entry']['value']
1177+
if ( myFsss in integrationDict ):
1178+
integrationDict[ myFsss ] += myUsage
1179+
else:
1180+
integrationDict[ myFsss ] = myUsage
1181+
except KeyError as excptn:
1182+
logging.warning("Bad query result entry, skipping, %s" %
1183+
str(excptn))
1184+
continue
1185+
ackSet = set()
1186+
myTime = ( limit15m - first15m ) / 4
1187+
for myFsss in sorted( integrationDict.keys(), reverse=True ):
1188+
myCPU = integrationDict[ myFsss ] / myTime
1189+
logging.log(25, "Fsss %s provided %.1f CPU cores" % (myFsss, myCPU))
1190+
if ( myCPU >= 100.0 ):
1191+
ackSet.add( fsssDict[myFsss] )
1192+
else:
1193+
fsssList = myFsss.split("___")
1194+
if ( len(fsssList) == 3 ):
1195+
parentFsss = fsssList[0] + "___" + fsssList[1]
1196+
if ( parentFsss not in fsssDict ):
1197+
parentFsss = fsssList[0]
1198+
elif ( len(fsssList) == 2 ):
1199+
parentFsss = fsssList[0]
1200+
else:
1201+
continue
1202+
if ( parentFsss not in fsssDict ):
1203+
continue
1204+
if ( parentFsss in integrationDict ):
1205+
integrationDict[ parentFsss ] += integrationDict[ myFsss ]
1206+
else:
1207+
integrationDict[ parentFsss ] = integrationDict[ myFsss ]
1208+
1209+
1210+
logging.info(" found %d fsss'es providing 100 cores or more" %
1211+
len(ackSet))
1212+
#
1213+
return list( ackSet )
1214+
# ########################################################################### #
1215+
1216+
1217+
10881218
def admf_write_acknowledgement(quarterString, tupleList, filepath = None):
10891219
"""write computing acknowledgement LaTex file"""
10901220
# ####################################################################### #
@@ -1851,7 +1981,7 @@ def admf_make_tzlist():
18511981
#
18521982
# get list of sites contributing computing:
18531983
# =========================================
1854-
compTuple = admf_influxdb_jobmon(frstDay, nextDay, siteDict, fsssDict)
1984+
compTuple = admf_grafana_jobmon(frstDay, nextDay, siteDict, fsssDict)
18551985
#
18561986
#
18571987
tupleList = sorted( set( diskTuple + compTuple ) )

0 commit comments

Comments
 (0)