-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsensor_broker
executable file
·222 lines (193 loc) · 7.66 KB
/
sensor_broker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
#!/usr/bin/python3
import subprocess
import datetime
import pprint
import yaml
import paho.mqtt.client as paho
from flask import Flask
from flask import Response
from flask import request
app = Flask(__name__)
""" Get MQTT client object as global variable to make it available everywhere.
Used by HomeAssistant to get values """
MQTT_CLIENT = paho.Client("sensor-broker")
""" dict of all the sensor data we have"""
DATA = {}
""" path to a yaml export of data for HomeAssistant to read from """
HA_EXPORT_PATH = "/home/homeassistant/sensor_broker.yaml"
""" Where to store persistent properties """
PERSISTENT_PROP_PATH = "/var/lib/sensor_broker/persistent_properties"
PERSISTENT_PROP = {}
""" default refresh rate of a sensor unless provided by sensor itself.
In seconds """
default_refresh_rate = 5*60
""" maps sensor IP to room names """
room_mapping = {
"10.42.0.10": "attic",
"10.42.0.11": "outside"
}
""" Ping """
@app.route('/')
def root():
return "OK"
""" Dump all cached data for debugging"""
@app.route('/dump_data')
def dump_data():
global DATA
return pprint.pformat(DATA)
""" Endpoint for sensors to upload data """
@app.route("/sensor", methods=["GET"])
def grafana_search():
global DATA
sensor_ip = request.remote_addr
new_sensor = False
try:
room = room_mapping[sensor_ip]
except KeyError:
room = "Unknown"
if sensor_ip not in DATA.keys():
new_sensor = True
DATA[sensor_ip] = {}
DATA[sensor_ip]["property"] = {}
DATA[sensor_ip]["room"] = room
DATA[sensor_ip]["timestamp"] = datetime.datetime.now()
try:
DATA[sensor_ip]["refresh_rate"] = int(request.args["refresh_rate"])
except KeyError:
DATA[sensor_ip]["refresh_rate"] = default_refresh_rate
for (prop, value) in request.args.items():
# skip meta properties
if prop in ("refresh_rate"):
continue
# turn relative value of the water meter property
# into absolute by adding to the persistent baseline
if sensor_ip == "192.168.1.8" and prop == "water__type_cold":
value = int(value) + get_persistent_prop("cold_water")
set_persistent_prop("cold_water", value)
if sensor_ip == "192.168.1.8" and prop == "water__type_hot":
value = int(value) + get_persistent_prop("hot_water")
set_persistent_prop("hot_water", value)
DATA[sensor_ip]["property"][prop] = value
mqtt_publish(room=room, property=prop, value=value)
print("sensor {}: {} = {}\n".format(sensor_ip, prop, value))
if new_sensor:
yaml_export_generate()
return "OK"
@app.route('/metrics')
def prometheus_metrics():
""" Prometheus endpoint
Each property becomes a prometheus metric. If name contains __ what
follows will be used as label"""
def generate_response(DATA):
known_targets = []
# number of sensors that have data older than 2x their refresh_rate
unreachable_sensors = get_unreachable_sensors()
for sensor_ip in DATA:
if sensor_ip in unreachable_sensors:
continue
else:
for prop in DATA[sensor_ip]["property"]:
label_string = ""
if "__" in prop:
metric_name = "sensor_{}".format(prop.split("__")[0])
for label in prop.split("__")[1:]:
label_name = label.split("_")[0]
label_value = label.split("_")[1]
label_string += """ {0}="{1}" """.format(label_name, label_value)
else:
metric_name = "sensor_{}".format(prop)
if metric_name not in known_targets:
yield "# TYPE {} gauge\n".format(metric_name)
known_targets.append(metric_name)
yield """{metric_name}{{ip="{ip}" room="{room}" {label_string} }} {value}\n""".format(
metric_name=metric_name,
ip=sensor_ip,
room=DATA[sensor_ip]["room"],
label_string=label_string,
value=DATA[sensor_ip]["property"][prop])
yield "# TYPE sensor_unreachable gauge\n"
yield "sensor_unreachable {}\n".format(len(unreachable_sensors))
return Response(generate_response(DATA), mimetype='text/plain')
@app.route('/unreachable_sensors')
def unreachable_sensors():
""" List unreachable sensors for easy debugging"""
unreachable_sensors_ips = get_unreachable_sensors()
unreachable_sensors_data = {}
for sensor_ip in unreachable_sensors_ips:
unreachable_sensors_data[sensor_ip] = DATA[sensor_ip]
return pprint.pformat(unreachable_sensors_data)
def get_unreachable_sensors():
""" return list of IPs of unreachable sensors =
sensors that have data oler than 2x their refresh rate """
unreachable_sensors_ips = []
for sensor_ip in DATA:
if (datetime.datetime.now() - DATA[sensor_ip]["timestamp"]).total_seconds() > 2 * DATA[sensor_ip]["refresh_rate"]:
unreachable_sensors_ips.append(sensor_ip)
return unreachable_sensors_ips
@app.route('/notify', methods=["GET"])
def notify():
""" Immediately notify on telegram"""
message = request.args["message"]
p = subprocess.run([
"curl",
"-s",
"--form",
"""text="{0}" """.format(message),
"--form-string",
"chat_id=CHATID",
"https://api.telegram.org/botAPITOKEN/sendMessage"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return p.stdout + p.stderr
def mqtt_publish(room, property, value):
""" publish (send) fresh data to MQTT broker """
topic = "home/" + room + "/" + property
MQTT_CLIENT.subscribe(topic)
MQTT_CLIENT.publish(topic, value)
def yaml_export_generate():
""" regenerate yaml export for HomeAssistant (HA) """
# create data structure for HA to accept
data_ha = []
# HA is in czech so we need to translate terms
trans = {
"temperature": "teplota",
"humidity" : "vlhkost",
"RSSI" : "WIFI signal",
"voltage" : "napeti baterie"
}
for sensor in DATA:
for prop in DATA[sensor]["property"]:
# ignore debugging properties
if prop[:6] == "debug_":
continue
data_ha_property = {"platform": "mqtt"}
try:
prop_trans = trans[prop]
except:
prop_trans = prop
data_ha_property["name"] = DATA[sensor]["room"] + " " + prop_trans
topic = "home/" + DATA[sensor]["room"] + "/" + prop
data_ha_property["state_topic"] = topic
data_ha_property["unique_id"] = topic
data_ha.append(data_ha_property)
with open(HA_EXPORT_PATH, 'w') as file:
yaml.dump(data_ha, file, default_flow_style=False)
def get_persistent_prop(prop):
""" returns value of a persistent property """
global PERSISTENT_PROP
# empty -> load from disk
if len(PERSISTENT_PROP) == 0:
with open(PERSISTENT_PROP_PATH) as file:
PERSISTENT_PROP = yaml.load(file)
print("loading persistent properties: " + str(PERSISTENT_PROP))
return int(PERSISTENT_PROP[prop])
def set_persistent_prop(prop, value):
""" set value of a persistent property """
global PERSISTENT_PROP
PERSISTENT_PROP[prop] = value
with open(PERSISTENT_PROP_PATH, 'w') as file:
yaml.dump(PERSISTENT_PROP, file, default_flow_style=False)
if __name__ == '__main__':
MQTT_CLIENT.connect("127.0.0.1")
MQTT_CLIENT.loop_start()
app.run(host='0.0.0.0', port=7890)