-
Notifications
You must be signed in to change notification settings - Fork 595
Expand file tree
/
Copy pathcelerymon.py
More file actions
96 lines (75 loc) · 2.46 KB
/
celerymon.py
File metadata and controls
96 lines (75 loc) · 2.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# coding=utf-8
"""
Collects simple task stats out of a running celerymon process
#### Dependencies
* celerymon connected to celery broker
Example config file CelerymonCollector.conf
```
enabled=True
host=celerymon.example.com
port=16379
```
"""
import diamond.collector
from urllib.request import urlopen
import time
try:
import json
except ImportError:
import simplejson as json
class CelerymonCollector(diamond.collector.Collector):
LastCollectTime = None
def get_default_config_help(self):
config_help = super(CelerymonCollector, self).get_default_config_help()
config_help.update({
'path': 'celerymon',
'host': 'A single hostname to get metrics from',
'port': 'The celerymon port'
})
return config_help
def get_default_config(self):
"""
Returns the default collector settings
"""
config = super(CelerymonCollector, self).get_default_config()
config.update({
'host': 'localhost',
'port': '8989'
})
return config
def collect(self):
"""
Overrides the Collector.collect method
"""
# Handle collection time intervals correctly
CollectTime = int(time.time())
time_delta = float(self.config['interval'])
if not self.LastCollectTime:
self.LastCollectTime = CollectTime - time_delta
host = self.config['host']
port = self.config['port']
celerymon_url = "http://%s:%s/api/task/?since=%i" % (
host, port, self.LastCollectTime)
response = urlopen(celerymon_url)
body = response.read()
celery_data = json.loads(body)
results = dict()
total_messages = 0
for data in celery_data:
name = str(data[1]['name'])
if name not in results:
results[name] = dict()
state = str(data[1]['state'])
if state not in results[name]:
results[name][state] = 1
else:
results[name][state] += 1
total_messages += 1
# Publish Metric
self.publish('total_messages', total_messages)
for result in results:
for state in results[result]:
metric_value = results[result][state]
metric_name = "%s.%s" % (result, state)
self.publish(metric_name, metric_value)
self.LastCollectTime = CollectTime