Skip to content

Commit

Permalink
Merge branch 'robot-manager' of https://github.com/sonelu/roboglia in…
Browse files Browse the repository at this point in the history
…to robot-manager
  • Loading branch information
sonelu committed May 19, 2020
2 parents 33f6117 + 974b533 commit b492fa1
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 30 deletions.
30 changes: 27 additions & 3 deletions roboglia/base/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import yaml
import logging

from ..utils import get_registered_class, check_not_empty, check_type
from ..utils import get_registered_class, check_not_empty, \
check_type, check_key

from .bus import BaseBus, SharedBus

Expand Down Expand Up @@ -124,17 +125,40 @@ def __init__(self, name='DEVICE', bus=None, dev_id=None, model=None,
model_ini = BaseDevice.cache[model_file]
self.__registers = {}
self.__reg_by_addr = {}
clones = []
for reg_name, reg_info in model_ini['registers'].items():
# add name to the dictionary
reg_info['name'] = reg_name
reg_info['device'] = self
if reg_info.get('clone', False):
# we register clones at the end after we have the main
# registers so that we can refer to them
clones.append(reg_info)
else:
reg_class_name = reg_info.get('class', self.default_register())
reg_class = get_registered_class(reg_class_name)
new_register = reg_class(**reg_info)
# we add as an attribute of the register too
self.__dict__[reg_name] = new_register
self.__registers[reg_name] = new_register
self.__reg_by_addr[reg_info['address']] = new_register
# now the clones
for reg_info in clones:
# check that the register address is covered by a main register
check_key('address', reg_info, 'register', reg_info['name'],
logger)
check_key(reg_info['address'], self.__reg_by_addr, 'register',
reg_info['name'], logger,
f'no main register with address {reg_info["address"]} '
'defined')
reg_info['clone'] = self.register_by_address(reg_info['address'])
reg_name = reg_info['name']
reg_class_name = reg_info.get('class', self.default_register())
reg_class = get_registered_class(reg_class_name)
reg_info['device'] = self
new_register = reg_class(**reg_info)
# we add as an attribute of the register too
self.__dict__[reg_name] = new_register
self.__registers[reg_name] = new_register
self.__reg_by_addr[reg_info['address']] = new_register
self.__inits = inits

@property
Expand Down
33 changes: 33 additions & 0 deletions roboglia/base/devices/DUMMY.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,36 @@ registers:
maxim: 160
factor: 10.0
default: 120

status:
address: 99
default: 0b01010101

status_unmasked:
address: 99
clone: True
access: RW
class: BoolRegister


status_one:
address: 99
clone: True
class: BoolRegister
mask: 0b00000001


status_2and3:
address: 99
clone: True
access: RW
class: BoolRegister
mask: 0b00000110
mode: all

status_2or3:
address: 99
clone: True
class: BoolRegister
mask: 0b00000110
mode: any
144 changes: 119 additions & 25 deletions roboglia/base/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,26 @@ class BaseRegister():
access: str
Read ('R') or read-write ('RW'); default 'R'
clone: BaseRegister or subclass or ``None``
Indicates if the register is a clone; this value provides the
reference to the register object that acts as the main register
in interation with the communication bus. This allows you to define
multiple represtnations of the same physical register (at a given
address) with the purpose of having different external
representations. For example:
- you can have a position register that can provide the external
value in degrees or radians,
- a velocity register that can provide the external value in degrees
per second, radians per second or rotations per minute,
- a byte register that reads 8 inputs and mask them each as a
:py:class:`BoolRegister` with a different bit mask
In the device definition YAML file use ``True`` to indicate if a
register is a clone. The device constructor will replace the reference
of the main register with the same address in the constructor of this
register.
sync: bool
``True`` if the register will be updated from the real device
using a sync loop. If `sync` is ``False`` access to the register
Expand All @@ -66,7 +86,7 @@ class BaseRegister():
order: ``LH`` or ``HL``
Applicable only for registers with size > 1 that represent a value
over succesive internal registers, but for convenience are groupped
over successive internal registers, but for convenience are groupped
as one single register with size 2 (or higher).
``LH`` means low-high and indicates the bytes in the registry are
organized starting with the low byte first. ``HL`` indicates that
Expand All @@ -80,18 +100,23 @@ class BaseRegister():
The default value for the register; implicit 0
"""
def __init__(self, name='REGISTER', device=None, address=0, size=1,
minim=0, maxim=None, access='R', sync=False, word=False,
bulk=True, order='LH', default=0, **kwargs):
def __init__(self, name='REGISTER', device=None, address=0, clone=None,
size=1, minim=0, maxim=None, access='R', sync=False,
word=False, bulk=True, order='LH', default=0, **kwargs):
# these are already checked by the device
self.__name = name
# device
check_not_empty(device, 'device', 'register', self.name, logger)
check_type(device, BaseDevice, 'register', self.name, logger)
self.__device = device
# address
# check_not_empty(address, 'address', 'register', self.name, logger)
if address != 0:
check_not_empty(address, 'address', 'register', self.name, logger)
self.__address = address
# clone
if clone:
check_type(clone, BaseRegister, 'register', self.name, logger)
self.__clone = clone
# size
check_not_empty(size, 'size', 'register', self.name, logger)
check_type(size, int, 'register', self.name, logger)
Expand Down Expand Up @@ -140,6 +165,11 @@ def address(self):
"""The register's address in the device."""
return self.__address

@property
def clone(self):
"""Indicates the register is a clone of another."""
return self.__clone

@property
def size(self):
"""The regster's size in Bytes."""
Expand Down Expand Up @@ -217,19 +247,27 @@ def default(self):

@property
def int_value(self):
"""The internal value of the register."""
return self.__int_value
"""Internal value of register, if a clone return the value of the
main register."""
if not self.clone:
return self.__int_value
else:
return self.clone.int_value

@int_value.setter
def int_value(self, value):
"""Allows only :py:class:`BaseSync` derrived classes to set the values
for the ``int_value``."""
for the ``int_value``. If clone, store the value in the main register.
"""
caller = inspect.stack()[1].frame.f_locals['self']
if isinstance(caller, BaseSync):
self.__int_value = value
if isinstance(caller, BaseSync) or isinstance(caller, BaseRegister):
if not self.clone:
self.__int_value = value
else:
self.clone.int_value = value
else:
logger.error('only BaseSync subclasses can chance the '
'internal value ')
'internal value')

def value_to_external(self, value):
"""Converts the presented value to external format according to
Expand Down Expand Up @@ -288,7 +326,7 @@ def value(self):
subclasses to provide different representations of the register's
value (hence the ``any`` return type).
"""
if not self.sync:
if not self.sync and not self.clone:
self.read()
return self.value_to_external(self.int_value)

Expand All @@ -311,7 +349,7 @@ def value(self, value):
# trim according to min and max for the register
if self.access != 'R':
int_value = self.value_to_internal(value)
self.__int_value = max(self.minim, min(self.maxim, int_value))
self.int_value = max(self.minim, min(self.maxim, int_value))
if not self.sync: # pragma: no branch
# direct sync
self.write()
Expand Down Expand Up @@ -347,27 +385,64 @@ def __str__(self):
class BoolRegister(BaseRegister):
"""A register with BOOL representation (true/false).
Inherits from :py:class:`BaseRegister` all methods and defaults ``max``
to 1.
Inherits from :py:class:`BaseRegister` all methods.
Overrides `value_to_external` and `value_to_internal` to process
a bool value.
Parameters
----------
mask: int or ``None``
An optional mask to use in the determination of the output of the
register. Default is None and in this case we simply compare the
internal value with 0.
mode: str ('all' or 'any')
Indicates how the mask should be used: 'all' means all the bits
in the mask must match while 'any'
means any bit that matches the mask is enough to result in a ``True``
external value. Only used if mask is not ``None``. Default is 'any'.
"""
def __init__(self, **kwargs):
if 'maxim' in kwargs: # pragma: no branch
logger.warning('parameter "maxim" for BoolRegister ignored, '
'it will be defaulted to 1')
del kwargs['maxim']
super().__init__(maxim=1, **kwargs)
def __init__(self, mask=None, mode='any', **kwargs):
super().__init__(**kwargs)
if mask:
check_type(mask, int, 'register', self.name, logger)
check_options(mode, ['all', 'any'], 'register', self.name, logger)
self.__mask = mask
self.__mode = mode

@property
def mask(self):
"""The mask used."""
return self.__mask

@property
def mode(self):
"""The bitmasking mode ('all' or 'any')."""
return self.__mode

def value_to_external(self, value):
"""The external representation of bool register.
"""
return bool(value)
if self.mask is None:
return bool(value)
else:
if self.mode == 'any':
return bool(value & self.mask)
elif self.mode == 'all':
return (value & self.mask) == self.mask
else:
raise NotImplementedError

def value_to_internal(self, value):
"""The internal representation of the register's value.
"""
return int(value)
if value:
if self.mask:
return self.mask
else:
return 1
else:
return 0


class RegisterWithConversion(BaseRegister):
Expand All @@ -390,16 +465,29 @@ class RegisterWithConversion(BaseRegister):
offset: int
The offset for the conversion; defaults to 0 (int)
sign_bit: int or None
If a number is given it means that the register is "signed" and that
bit represents the sign. Bits are numbered from 1 meaning that if
``sign_bit`` is 1 the less significant bit is used and if we have
a 2 bytes register the most significant bit would be 16.
The convention is that numbers having 0 in this bit are positive
and the ones having 1 are negative numbers.
Raises:
KeyError: if any of the mandatory fields are not provided
ValueError: if value provided are wrong or the wrong type
"""
def __init__(self, factor=1.0, offset=0, **kwargs):
def __init__(self, factor=1.0, offset=0, sign_bit=None, **kwargs):
super().__init__(**kwargs)
check_type(factor, float, 'register', self.name, logger)
self.__factor = factor
check_type(offset, int, 'register', self.name, logger)
self.__offset = offset
if sign_bit:
check_type(sign_bit, int, 'register', self.name, logger)
self.__sign_bit = pow(2, sign_bit)
else:
self.__sign_bit = None

@property
def factor(self):
Expand All @@ -420,6 +508,9 @@ def value_to_external(self, value):
external = (internal - offset) / factor
"""
if self.__sign_bit and value > (self.__sign_bit / 2):
# negative number
value = value - self.__sign_bit
return (float(value) - self.offset) / self.factor

def value_to_internal(self, value):
Expand All @@ -433,7 +524,10 @@ def value_to_internal(self, value):
The resulting value is rounded to produce an integer suitable
to be stored in the register.
"""
return round(float(value) * self.factor + self.offset)
value = round(float(value) * self.factor + self.offset)
if value < 0 and self.__sign_bit:
value = value + self.__sign_bit
return value


class RegisterWithThreshold(BaseRegister):
Expand Down
7 changes: 7 additions & 0 deletions roboglia/i2c/devices/DUMMY.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,28 @@ registers:
desc: Accelerometer Z Byte Register

word_xl_x:
class: RegisterWithConversion
address: 7
size: 2
access: RW
sign_bit: 16
desc: Accelerometer X Word Register

word_xl_y:
class: RegisterWithConversion
address: 9
size: 2
access: RW
sign_bit: 16
factor: 10.0
desc: Accelerometer Y Word Register

word_xl_z:
class: RegisterWithConversion
address: 11
size: 2
access: RW
sign_bit: 16
desc: Accelerometer Z Word Register

temp:
Expand Down
Loading

0 comments on commit b492fa1

Please sign in to comment.