-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrobotling_base.py
407 lines (361 loc) · 13.4 KB
/
robotling_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# ----------------------------------------------------------------------------
# robotling_base.py
# Definition of a base class `RobotlingBase`, from which classes that capture
# all functions and properties of a specific board
#
# The MIT License (MIT)
# Copyright (c) 2020-22 Thomas Euler
# 2020-09-04, v1
# 2020-10-31, v1.1, use `languageID` instead of `ID`
# 2020-12-21, v1.2, moved all RGB pixel management here
# 2022-01-02, v1.3, Nano RP2040 Connect added, memory functions refined
# ----------------------------------------------------------------------------
import gc
import array
from micropython import const
from robotling_lib.misc.helpers import TimeTracker
import robotling_lib.misc.ansi_color as ansi
import robotling_lib.robotling_board as rb
from robotling_lib.misc.color_wheel import getColorFromWheel
from robotling_lib.platform.platform import platform as pf
if pf.languageID == pf.LNG_MICROPYTHON:
import time
from machine import Pin
if pf.isRP2:
import robotling_lib.platform.rp2.dio as dio
from robotling_lib.platform.rp2 import busio
else:
import robotling_lib.platform.esp32.dio as dio
from robotling_lib.platform.esp32 import busio
elif pf.languageID == pf.LNG_CIRCUITPYTHON:
import robotling_lib.platform.circuitpython.time as time
from robotling_lib.platform.circuitpython import busio
import robotling_lib.platform.circuitpython.dio as dio
else:
print(ansi.RED +"ERROR: No matching libraries in `platform`." +ansi.BLACK)
# pylint: disable=bad-whitespace
__version__ = "0.1.3.0"
# pylint: enable=bad-whitespace
# ----------------------------------------------------------------------------
class RobotlingBase(object):
"""Robotling base class.
Objects:
-------
- onboardLED : on(), off()
Methods:
-------
- connectToWLAN():
Connect to WLAN if not already connected
- updateStart(), updateEnd()
To be called at the beginning and the end of an update routine
- spin_ms(dur_ms=0, period_ms=-1, callback=None)
Instead of using a timer that calls `update()` at a fixed frequency (e.g.
at 20 Hz), one can regularly, calling `spin()` once per main loop and
everywhere else instead of `time.sleep_ms()`. For details, see there.
- spin_while_moving(t_spin_ms=50)
Call spin frequently while waiting for the current move to finish
- startPulsePixel()
Set color of RGB pixel and enable pulsing
- printReport()
Print statistics of the last run into the history
- powerDown()
To be called when the robot shuts down; to be overwritten
Properties:
----------
- memory : Returns allocated and free memory as tuple
- PixelRGB : get and set color (r,g,b tuple or color wheel index)
- dotStarPower : Turns power to DotStar LED, if any, on or off
Internal objects:
----------------
- _MCP3208 : 8-channel 12-bit A/C converter driver
Internal methods:
----------------
- _pulsePixel()
Update pulsing, if enabled
"""
MIN_UPDATE_PERIOD_MS = const(20) # Minimal time between update() calls
APPROX_UPDATE_DUR_MS = const(8) # Approx. duration of the update/callback
HEARTBEAT_STEPS = const(10) # Number of steps for RGB pixel pulsing
def __init__(self, neoPixel=False, MCP3208=False):
""" Initialize spin management
"""
# Get the current time in seconds
self._run_duration_s = 0
self._start_s = time.time()
# Initialize some variables
self._ID = pf.GUID
self._Tele = None
self._MCP3208 = None
self._Pix_enablePulse = False
self.onboardLED = None
self._NPx = None
self._DS = None
self._collect()
if MCP3208:
# Initialize analog sensor driver
from robotling_lib.driver import mcp3208
dev = None if pf.ID == pf.ENV_ESP32_S2 else 1
dev = 0 if pf.isRP2 else dev
self._SPI = busio.SPIBus(rb.SPI_FRQ, rb.SCK, rb.SDO, rb.SDI, spidev=dev)
self._MCP3208 = mcp3208.MCP3208(self._SPI, rb.CS_ADC)
# RGB Pixel management
if neoPixel:
# Initialize Neopixel (connector)
if pf.languageID == pf.LNG_MICROPYTHON:
if pf.isRP2:
from robotling_lib.platform.rp2.neopixel import NeoPixel
else:
from robotling_lib.platform.esp32.neopixel import NeoPixel
elif pf.languageID == pf.LNG_CIRCUITPYTHON:
from robotling_lib.platform.circuitpython.neopixel import NeoPixel
self._NPx = NeoPixel(rb.NEOPIX, 1)
self._NPx.set((0,0,0), 0, True)
s = "[{0:>12}] {1:35}".format("Neopixel", "ready")
print(ansi.GREEN +s +ansi.BLACK)
if rb.DS_CLOCK:
# Initialize onboard RGB LED, if present
from robotling_lib.driver.dotstar import DotStar
self._stateDS = True
self.dotStarPower = self._stateDS
self._DS = DotStar(rb.DS_CLOCK, rb.DS_DATA, 1, brightness=0.5)
self._DS[0] = 0
if self._DS or self._NPx:
# Initialize pulse management
self._Pix_enablePulse = True
self._Pix_iColor = 0
self._Pix_RGB = bytearray([0]*3)
self._Pix_curr = array.array("i", [0,0,0])
self._Pix_step = array.array("i", [0,0,0])
self._Pix_fact = 1.0
self._Pix_pulse = False
self.PixelRGB = 0
# Initialize on-board (feather) hardware
if rb.RED_LED:
self.onboardLED = dio.DigitalOut(rb.RED_LED)
# Initialize spin function-related variables
self._spin_period_ms = 0
self._spin_t_last_ms = 0
self._spin_callback = None
self._spinTracker = TimeTracker()
self._collect()
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def _collect(self):
gc.collect()
@property
def memory(self):
return (gc.mem_alloc(), gc.mem_free())
@property
def ID(self):
return self._ID
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def powerDown(self):
""" Record run time
"""
if self._NPx:
self._NPx.set((0,0,0), 0, True)
if self._DS:
self._DS[0] = 0
self.dotStarPower = False
self._run_duration_s = time.time() -self._start_s
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@property
def dotStarPower(self):
return self._stateDS
@dotStarPower.setter
def dotStarPower(self, state):
if not rb.DS_POWER:
return
if state:
Pin(rb.DS_POWER, Pin.OUT, None)
Pin(rb.DS_POWER).value(False)
else:
Pin(rb.DS_POWER, Pin.IN, Pin.PULL_HOLD)
Pin(rb.DS_CLOCK, Pin.OUT if state else Pin.IN)
Pin(rb.DS_DATA, Pin.OUT if state else Pin.IN)
time.sleep_ms(35)
self._stateDS = state
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def updateStart(self):
""" To be called at the beginning of the update function
"""
self._spinTracker.reset()
if self._MCP3208:
self._MCP3208.update()
if self._Pix_enablePulse:
self._pulsePixel()
def updateEnd(self):
""" To be called at the end of the update function
"""
if self._spin_callback:
self._spin_callback()
self._spinTracker.update()
def spin_ms(self, dur_ms=0, period_ms=-1, callback=None):
""" If not using a Timer to call `update()` regularly, calling `spin()`
once per main loop and everywhere else instead of `time.sleep_ms()`
is an alternative to keep the robotling board updated.
e.g. "spin(period_ms=50, callback=myfunction)"" is setting it up,
"spin(100)"" (~sleep for 100 ms) or "spin()" keeps it running.
"""
if self._spin_period_ms > 0:
p_ms = self._spin_period_ms
p_us = p_ms *1000
d_us = dur_ms *1000
if dur_ms > 0 and dur_ms < (p_ms -APPROX_UPDATE_DUR_MS):
time.sleep_ms(int(dur_ms))
elif dur_ms >= (p_ms -APPROX_UPDATE_DUR_MS):
# Sleep for given time while updating the board regularily; start by
# sleeping for the remainder of the time to the next update ...
t_us = time.ticks_us()
dt_ms = time.ticks_diff(time.ticks_ms(), self._spin_t_last_ms)
if dt_ms > 0 and dt_ms < p_ms:
time.sleep_ms(dt_ms)
# Update
self.update()
self._spin_t_last_ms = time.ticks_ms()
# Check if sleep time is left ...
d_us = d_us -int(time.ticks_diff(time.ticks_us(), t_us))
if d_us <= 0:
return
# ... and if so, pass the remaining time by updating at regular
# intervals
while time.ticks_diff(time.ticks_us(), t_us) < (d_us -p_us):
time.sleep_us(p_us)
self.update()
# Remember time of last update
self._spin_t_last_ms = time.ticks_ms()
else:
# No sleep duration given, thus just check if time is up and if so,
# call update and remember time
d_ms = time.ticks_diff(time.ticks_ms(), self._spin_t_last_ms)
if d_ms > self._spin_period_ms:
self.update()
self._spin_t_last_ms = time.ticks_ms()
elif period_ms > 0:
# Set up spin parameters and return
self._spin_period_ms = period_ms
self._spin_callback = callback
self._spinTracker.reset(period_ms)
self._spin_t_last_ms = time.ticks_ms()
else:
# Spin parameters not setup, therefore just sleep
time.sleep_ms(dur_ms)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def connectToWLAN(self):
""" Connect to WLAN if not already connected
"""
if pf.ID in [pf.ENV_ESP32_UPY, pf.ENV_ESP32_TINYPICO, pf.ENV_ESP32_S2,
pf.ENV_MPY_RP2_NANOCONNECT]:
import network
from NETWORK import my_ssid, my_wp2_pwd
if not network.WLAN(network.STA_IF).isconnected():
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('Connecting to network...')
sta_if.active(True)
sta_if.connect(my_ssid, my_wp2_pwd)
while not sta_if.isconnected():
self.onboardLED.on()
time.sleep(0.05)
self.onboardLED.off()
time.sleep(0.05)
print("[{0:>12}] {1}".format("network", sta_if.ifconfig()))
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def printReport(self):
""" Prints a report on memory usage and performance
"""
self.printMemory(report=True)
avg_ms = self._spinTracker.meanDuration_ms
dur_ms = self._spinTracker.period_ms
print("Performance: spin: {0:6.3f}ms @ {1:.1f}Hz ~{2:.0f}%"
.format(avg_ms, 1000/dur_ms, avg_ms /dur_ms *100))
def printMemory(self, do_collect=False, report=False, as_str=False):
""" Prints just the information about memory usage
"""
used, free = self.memory
total = free +used
if do_collect:
self._collect()
used, free1 = self.memory
freed = free1 -free
usedp = used /total *100
tot_kb = total /1024
s1 = "Memory : " if report else ""
s2 = "" if not(do_collect) else " ({0} bytes freed)".format(freed)
s3 = "{0}{1:.0f}% of {2:.0f}kB RAM used{3}.".format(s1, usedp, tot_kb, s2)
if not(as_str):
print(s3)
else:
return s3
def collectMemory(self):
return self.printMemory(True, False, True)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@property
def PixelRGB(self):
return self._Pix_RGB
@PixelRGB.setter
def PixelRGB(self, value):
""" Set color of RGB LEDs ("Pixel") by assigning a RGB value or a color
wheel index (and stop pulsing, if running)
"""
try:
rgb = bytearray([value[0], value[1], value[2]])
except TypeError:
rgb = getColorFromWheel(value)
if self._NPx:
self._NPx.set(rgb, 0, True)
if self._DS:
self._DS[0] = self._Pix_curr
self._DS.show()
self._Pix_pulse = False
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def startPulsePixel(self, value):
""" Set color of RGB LEDs and enable pulsing
"""
iColPrev = self._Pix_iColor
if self._Pix_enablePulse:
try:
rgb = bytearray([value[0], value[1], value[2]])
except TypeError:
rgb = getColorFromWheel(value)
self._Pix_iColor = value
if (rgb != self._Pix_RGB) or not(self._Pix_pulse):
# New color and start pulsing
c = self._Pix_curr
s = self._Pix_step
c[0] = rgb[0]
s[0] = int(rgb[0] /self.HEARTBEAT_STEPS)
c[1] = rgb[1]
s[1] = int(rgb[1] /self.HEARTBEAT_STEPS)
c[2] = rgb[2]
s[2] = int(rgb[2] /self.HEARTBEAT_STEPS)
self._Pix_RGB = rgb
if self._NPx:
self._NPx.set(rgb, 0, True)
if self._DS:
self._DS[0] = self._Pix_curr
self._DS.show()
self._Pix_pulse = True
self._Pix_fact = 1.0
return iColPrev
def dimPixel(self, factor=1.0):
self._Pix_fact = max(min(1, factor), 0)
def _pulsePixel(self):
""" Update pulsing, if enabled
"""
if self._Pix_pulse:
rgb = self._Pix_RGB
for i in range(3):
self._Pix_curr[i] += self._Pix_step[i]
if self._Pix_curr[i] > (rgb[i] -self._Pix_step[i]):
self._Pix_step[i] *= -1
if self._Pix_curr[i] < abs(self._Pix_step[i]):
self._Pix_step[i] = abs(self._Pix_step[i])
if self._Pix_fact < 1.0:
self._Pix_curr[i] = int(self._Pix_curr[i] *self._Pix_fact)
self._Pix_curr[i] = min(self._Pix_curr[i], 255)
if self._NPx:
self._NPx.set(self._Pix_curr, 0, True)
if self._DS:
self._DS[0] = self._Pix_curr
self._DS.show()
# ----------------------------------------------------------------------------