|
| 1 | +import json |
| 2 | +import sys |
| 3 | +from typing import Optional, Dict, Iterable |
| 4 | + |
| 5 | +import influxdb |
| 6 | +import requests |
| 7 | + |
| 8 | + |
| 9 | +def get_config() -> dict: |
| 10 | + with open('config.json', 'r') as f: |
| 11 | + return json.loads(f.read()) |
| 12 | + |
| 13 | + |
| 14 | +def query_spaceapi(endpoint: str) -> Optional[dict]: |
| 15 | + """ |
| 16 | + Query the specified SpaceAPI endpoint and return the 'sensors' key. |
| 17 | + """ |
| 18 | + resp = requests.get(endpoint) |
| 19 | + try: |
| 20 | + data = resp.json() |
| 21 | + except json.decoder.JSONDecodeError: |
| 22 | + print('Endpoint response is not valid JSON') |
| 23 | + sys.exit(2) |
| 24 | + if 'api' not in data or 'space' not in data: |
| 25 | + print('Endpoint response does not look like a valid SpaceAPI object') |
| 26 | + sys.exit(2) |
| 27 | + return data.get('sensors', []) |
| 28 | + |
| 29 | + |
| 30 | +def make_datapoint(name: str, value: float, tags: Dict[str, str]): |
| 31 | + return { |
| 32 | + 'measurement': name, |
| 33 | + 'tags': tags, |
| 34 | + 'fields': { |
| 35 | + 'value': value, |
| 36 | + }, |
| 37 | + } |
| 38 | + |
| 39 | + |
| 40 | +def main(endpoint: str): |
| 41 | + config = get_config() |
| 42 | + |
| 43 | + # Fetch sensors |
| 44 | + sensors = query_spaceapi(endpoint) |
| 45 | + |
| 46 | + # List of datapoints |
| 47 | + datapoints = [] |
| 48 | + |
| 49 | + # Helper function for all sensors that have a numeric "value" field. |
| 50 | + def process_sensor(name: str, req_tags: Iterable[str], opt_tags: Iterable[str]): |
| 51 | + if not sensors.get(name): |
| 52 | + return |
| 53 | + for s in sensors[name]: |
| 54 | + tags = {} |
| 55 | + for tag in req_tags: |
| 56 | + tags[tag] = s[tag] |
| 57 | + for tag in opt_tags: |
| 58 | + if s.get(tag): |
| 59 | + tags[tag] = s[tag] |
| 60 | + datapoints.append(make_datapoint( |
| 61 | + name=name, |
| 62 | + value=float(s['value']), |
| 63 | + tags=tags, |
| 64 | + )) |
| 65 | + |
| 66 | + process_sensor('beverage_supply', ['unit'], ['name', 'location']) |
| 67 | + process_sensor('network_connections', [], ['type', 'name', 'location']) |
| 68 | + process_sensor('people_now_present', [], ['name', 'location']) |
| 69 | + process_sensor('temperature', ['unit', 'location'], ['name']) |
| 70 | + process_sensor('total_member_count', [], ['name', 'location']) |
| 71 | + |
| 72 | + # Send to InfluxDB |
| 73 | + client = influxdb.InfluxDBClient( |
| 74 | + config['influxdb_host'], config['influxdb_port'], |
| 75 | + config['influxdb_user'], config['influxdb_pass'], |
| 76 | + database=config['influxdb_db'], |
| 77 | + ssl=True, verify_ssl=True, timeout=10, |
| 78 | + ) |
| 79 | + client.write_points(datapoints) |
| 80 | + |
| 81 | + print('OK, sent %d datapoints' % len(datapoints)) |
| 82 | + |
| 83 | + |
| 84 | +if __name__ == '__main__': |
| 85 | + if len(sys.argv) != 2: |
| 86 | + print('Usage: %s <spaceapi-endpoint>' % sys.argv[0]) |
| 87 | + sys.exit(1) |
| 88 | + main(sys.argv[1]) |
0 commit comments