-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsense-monitor-to-influxdb.py
55 lines (46 loc) · 1.66 KB
/
sense-monitor-to-influxdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import asyncio
from decouple import config
from influxdb_client import InfluxDBClient, Point
from retry import retry
from sense_energy import ASyncSenseable
sense_monitor_name = config("SENSE_MONITOR_NAME")
sense_username = config("SENSE_USERNAME")
sense_password = config("SENSE_PASSWORD")
influxdb_url = config("INFLUXDB_URL")
influxdb_token = config("INFLUXDB_TOKEN")
influxdb_org = config("INFLUXDB_ORG")
influxdb_bucket = config("INFLUXDB_BUCKET")
influxdb_client = InfluxDBClient(
url=influxdb_url, # pyright: ignore [reportGeneralTypeIssues]
token=influxdb_token, # pyright: ignore [reportGeneralTypeIssues]
org=influxdb_org, # pyright: ignore [reportGeneralTypeIssues]
)
write_api = influxdb_client.write_api()
def realtime_callback(payload):
records = []
for device in payload["devices"]:
records.append(
Point("sense_devices")
.tag("monitor_name", sense_monitor_name)
.tag("id", device["id"])
.tag("name", device["name"])
.field("watts", device["w"])
)
for index, channel in enumerate(payload["channels"]):
records.append(
Point("sense_channel")
.tag("monitor_name", sense_monitor_name)
.tag("channel", index)
.field("watts", channel)
)
write_api.write(
influxdb_bucket, # pyright: ignore [reportGeneralTypeIssues]
influxdb_org, # pyright: ignore [reportGeneralTypeIssues]
records,
)
@retry()
async def main():
sense = ASyncSenseable()
await sense.authenticate(sense_username, sense_password)
await sense.async_realtime_stream(realtime_callback)
asyncio.run(main())