-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
124 lines (107 loc) · 3.49 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import os
from multiprocessing import Process
import time
import redis
from logs import Logger
logger = Logger(__name__)
try:
from RPi import GPIO # pylint: disable=import-error
except RuntimeError:
logger.fatal("Error importing RPi.GPIO! "
" This is probably because you need superuser privileges. "
"You can achieve this by using 'sudo' to run your script")
raise
GARAGE_SENSOR_PIN = 4
GARAGE_CONTROL_PIN = 17
try:
REDIS_HOST = os.environ['REDIS_HOST']
except KeyError:
print('REDIS_HOST not set')
raise
# TODO: May need to put some safety checks in here to
# make sure garage does not go haywire if pub/sub goes crazy
def listener():
"""
Listen to redis channel for garage door open/close request
"""
r = redis.Redis(
host=REDIS_HOST,
port=6379,
decode_responses=True
)
mobile = r.pubsub()
mobile.subscribe('garage-request')
for message in mobile.listen():
if message['data'] == 'open':
if read_garage_door_sensor() == 1:
print("Garage is open already! request to open has been ignored")
logger.info("Garage is open already! request to open has been ignored")
r.publish('garage-state', 'opened')
else:
print("Opening Garage!")
logger.info("Opening Garage!")
r.publish('garage-state', 'opening')
garage_relay()
r.publish('garage-state', 'opened')
elif message['data'] == 'close':
if read_garage_door_sensor() != 1:
logger.info("Garage is closed already! Request to closed has been ignored")
r.publish('garage-state', 'closed')
else:
logger.info("Closing Garage!")
r.publish('garage-state', 'closing')
garage_relay()
r.publish('garage-state', 'closed')
elif message['data'] == 1:
logger.info('got 1 returned meaning pub/sub has been initialized')
else:
logger.info(f'Invalid message {message}')
def garage_relay():
"""
Low level controls to open/close garage
"""
logger.info('TESTING >>>>>> doing action')
return 1
GPIO.setmode(GPIO.BCM)
GPIO.setup(GARAGE_CONTROL_PIN, GPIO.OUT)
GPIO.setwarnings(False)
GPIO.output(GARAGE_CONTROL_PIN, GPIO.HIGH)
time.sleep(1)
GPIO.output(GARAGE_CONTROL_PIN, GPIO.LOW)
GPIO.cleanup(GARAGE_CONTROL_PIN)
def read_garage_door_sensor():
"""
Read garage door sensor
"""
GPIO.setmode(GPIO.BCM)
GPIO.setup(GARAGE_SENSOR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
val = GPIO.input(GARAGE_SENSOR_PIN)
logger.info(f"read state is: {val}")
return val
def send_state_to_redis(garage_state):
"""
Send garage state to redis
"""
r = redis.Redis(
host=REDIS_HOST,
port=6379,
decode_responses=True
)
r.set('garage-state', garage_state)
logger.info(f'RPI::: Sending Garage state to redis::: Garage State: {garage_state}')
def read_and_send_state_to_redis():
"""
Read garage door sensor and send state to redis
"""
while True:
state = read_garage_door_sensor()
if state == 0:
send_state_to_redis('closed')
else:
send_state_to_redis('opened')
time.sleep(2)
if __name__ == '__main__':
p1 = Process(target=listener)
p2 = Process(target=read_and_send_state_to_redis)
p1.start()
p2.start()