-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtest.py
161 lines (132 loc) · 5.09 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
import asyncio
import time
import yaml
import binascii
import serial_asyncio
from datetime import datetime, timedelta
# All the shared functions are in this package.
from custom_components.sensor.pyvbus.vbuspacket import (
VBUSPacket,
VBUSPacketException
)
# Load configuration.
config = []
with open("0x7321.yaml", 'r', encoding='utf8') as stream:
try:
config = yaml.load(stream)
except yaml.YAMLError as exc:
print(exc)
# Get the config.
message = config.get('message')
class logger():
def error(self, value):
print("Error: %s" % value)
def warning(self, value):
print("Warning: %s" % value)
def warn(self, value):
print("Warn: %s" % value)
def info(self, value):
print("Info: %s" % value)
def debug(self, value):
print("Debug: %s" % value)
class Entity():
"""Fake HASS Entity"""
class ResolVbusSensor(Entity):
"""Representation of a Sensor."""
def __init__(self, name, attributes):
"""Initialize the sensor."""
_LOGGER.debug("Initialising ResolVbusSensor %s" % name)
self._state = None
self._attrsdef = attributes
self._name = name
self._attrs = {}
self._debugmessage = None
self._ttyPort = None
self._filterSource = None
self._filterDestination = None
self._filterCommand = None
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def state(self):
"""Return the state of the sensor."""
return self._state
@property
def device_state_attributes(self):
"""Return the device state attributes."""
return self._attrs
def process_packet(self, vbusPacket):
"""Update new state data for the sensor."""
for item in self._attrsdef:
attrName = item.get('name', '').lower().replace(' ', '_')
attrValue = None
try:
attrFormat = item.get('format', None)
if attrFormat == 'time':
attrValue = vbusPacket.GetTimeValue(item.get('offset',0),item.get('size',2))
elif attrFormat == 'temperature':
attrValue = vbusPacket.GetTemperatureValue(item.get('offset',0),item.get('size',2), item.get('factor', 1))
else:
attrValue = vbusPacket.GetRawValue(item.get('offset',0),item.get('size',2))
self._attrs[attrName] = attrValue
except Exception as e:
_LOGGER.warning("Attribute %s failed: %s" % (attrName, e))
def process_buffer(self, buffer):
"""Fetch new state data for the sensor.
This is the only method that should fetch new data for Home Assistant.
"""
# Run standard update
try:
_LOGGER.debug("Processing buffer: %s" % binascii.hexlify(buffer))
vbusPacket = VBUSPacket(buffer)
self.process_packet(vbusPacket)
except VBUSPacketException as e:
_LOGGER.warning("Update failed: %s" % e)
async def async_update(self):
"""Fetch new state data for the sensor.
This is the only method that should fetch new data for Home Assistant.
"""
# Run standard update
try:
if self._debugmessage is not None:
buffer = bytearray.fromhex(self._debugmessage)
self.process_buffer(buffer)
elif self._ttyPort is not None:
buffer = await self.async_readFromSerial(self._ttyPort, self._filterSource, self._filterDestination, self._filterCommand)
self.process_buffer(buffer)
except VBUSPacketException as e:
_LOGGER.error("Update failed: %s" % e)
async def async_readFromSerial(self, port, source=None, destination=None, command=None):
import serial_asyncio
import serial
reader, writer = await serial_asyncio.open_serial_connection(
url=port,
baudrate=9600
)
await reader.readuntil(b'\xaa') # wait for message start
try:
while True:
buffer = bytearray(b'\xaa') # initialise new buffer
buffer.extend(await reader.readuntil(b'\xaa'))
if len(buffer) >= 5:
header_destination = buffer[1] + buffer[2] * 0x100
header_source = buffer[3] + buffer[4] * 0x100
if source is None or source == header_source:
if destination is None or destination == header_destination:
return buffer[0:len(buffer)-1]
finally:
writer.close()
_LOGGER = logger()
async def async_setup_platform():
"""Setup the sensor platform."""
device = ResolVbusSensor(config.get('name'),
config.get('attributes'))
device._debugmessage = config.get('message')
await device.async_update()
print("%s: %s" % (device.name, device.device_state_attributes))
loop = asyncio.get_event_loop()
buffer = loop.run_until_complete(async_setup_platform())
loop.close()