Document gives a small example of using Ruideng DPS5020 Digital Power
Supply over Sigrok Python API driver. The example uses a middleware
wrapper, sigrogWrapper.py
, to simplify code in the Python demo.
Table of Content
- Use sigrok-python to control DPS5020 Digital Power Supply
- Sigrok Python API Demo
- Python middleware sigrokWrapper.py
- Module document string
- Module imports
- class Device document string
- class Device constructor
- class Device: config_keys, channel_groups, channels methods
- class Device: getter and setter methods
- class Device keyName2key
- class Device open and close guarded by open -state:
- class Device: output device information string
- class SigrokDriver document string
- class SigrokDriver: manage context
- class SigrokDriver: constructor
- class SigrokDriver: findDevice
- class SigrokDriver: manage device open/close state
- class SigrokDriver: session management
- class SigrokDriver: utilities for session callback functions
- class SigrokDriver: documentation string
- End of module
- References
Sigrok Python bindings is a thin wrapper on top of libsigrok C++ implementation. It has only limited support for diagnostics and just returns an error code without any diagnostics messages. The number of classes/types exposed to user on API interface also is quite large (30-40 classes).
This documents uses a Python module, sigrokWrapper.py
, which is
middleware exposing two classes needed to use a Sigrok Device. It
also wraps errors in lib with Python Exceptions documenting error
context and possible fixes. The sigrokWrapper.py
s documented in the
next chapter.
Using driveString='demo'
to access Sigrok demo device. In this example:
- create SigrokDriver using
driveString='demo'
- open demo device for the
SigrokDriver
created- output device configuration keys
- output device channel groups
- for each channel group output configuration keys of the channel group
import sigrokWrapper
print( f"Create sigork wrapper using driverString='{driverString}'")
demo_drv = sigrokWrapper.SigrokDriver(driver=driverString)
with demo_drv as demo_dev:
# Document string created
print( f"Device {demo_dev}")
print( f" config_keys={demo_dev.config_keys()}")
print( f"Channel-groups={demo_dev.channel_groups()}")
for grp in demo_dev.channel_groups():
print( f" Channel -group {grp}: config_keys={demo_dev.config_keys(channel_group=grp)}")
print( f"Channels={demo_dev.channels()}")
Create sigork wrapper using driverString='demo' Device , model :Demo device, version: - 13 channels: D0, D1, D2, D3, D4, D5, D6, D7, A0, A1, A2, A3, A4 config_keys=['CONTINUOUS', 'LIMIT_FRAMES', 'LIMIT_SAMPLES', 'LIMIT_MSEC', 'TRIGGER_MATCH', 'AVG_SAMPLES', 'AVERAGING', 'CAPTURE_RATIO', 'SAMPLERATE'] Channel-groups=['A0', 'A1', 'A2', 'A3', 'A4', 'Analog', 'Logic'] Channel -group A0: config_keys=['OFFSET', 'MEASURED_QUANTITY', 'AMPLITUDE', 'PATTERN_MODE'] Channel -group A1: config_keys=['OFFSET', 'MEASURED_QUANTITY', 'AMPLITUDE', 'PATTERN_MODE'] Channel -group A2: config_keys=['OFFSET', 'MEASURED_QUANTITY', 'AMPLITUDE', 'PATTERN_MODE'] Channel -group A3: config_keys=['OFFSET', 'MEASURED_QUANTITY', 'AMPLITUDE', 'PATTERN_MODE'] Channel -group A4: config_keys=['OFFSET', 'MEASURED_QUANTITY', 'AMPLITUDE', 'PATTERN_MODE'] Channel -group Analog: config_keys=['OFFSET', 'AMPLITUDE'] Channel -group Logic: config_keys=['PATTERN_MODE'] Channels=['D0', 'D1', 'D2', 'D3', 'D4', 'D5', 'D6', 'D7', 'A0', 'A1', 'A2', 'A3', 'A4']
Warning Python API seems to be returning C++ constant names, and not
string values naming device configuration options. Use sigrok-cli
--show
-command for the string values. For example, for the demo
-device
sigrok-cli --driver=demo --show
Driver functions: Demo device Logic analyzer Oscilloscope Scan options: logic_channels analog_channels limit_frames demo - Demo device with 13 channels: D0 D1 D2 D3 D4 D5 D6 D7 A0 A1 A2 A3 A4 Channel groups: Logic: channels D0 D1 D2 D3 D4 D5 D6 D7 Analog: channels A0 A1 A2 A3 A4 A0: channel A0 A1: channel A1 A2: channel A2 A3: channel A3 A4: channel A4 Supported configuration options across all channel groups: continuous: on, off limit_samples: 0 (current) limit_time: 0 (current) limit_frames: 0 (current) samplerate (1 Hz - 1 GHz in steps of 1 Hz) averaging: on, off (current) avg_samples: 0 (current) Supported triggers: 0 1 r f e captureratio: 20 (current)
Using driverString="rdtech-dps:conn=/dev/ttyUSB0"
for the code in
the previous chapter documents configuration options for RDTech
Digital Power Supply DPS5020 as follows:
: Create sigork wrapper using driverString='rdtech-dps:conn=/dev/ttyUSB0' : Device RDTech, model :DPS5020, version: v16 - 3 channels: V, I, P : config_keys=['CONTINUOUS', 'LIMIT_SAMPLES', 'LIMIT_MSEC', 'REGULATION', 'OVER_CURRENT_PROTECTION_THRESHOLD', 'OVER_CURRENT_PROTECTION_ACTIVE', 'OVER_VOLTAGE_PROTECTION_THRESHOLD', 'OVER_VOLTAGE_PROTECTION_ACTIVE', 'ENABLED', 'CURRENT_LIMIT', 'CURRENT', 'VOLTAGE_TARGET', 'VOLTAGE'] : Channel-groups=[] : Channels=['V', 'I', 'P']
Again, Use sigrok-cli
--show
-command for the configuration option names. For
example, for the DPS5020 use the command
sigrok-cli --driver=rdtech-dps:conn=/dev/ttyUSB0 --show
Driver functions: Power supply Scan options: conn serialcomm modbusaddr rdtech-dps - RDTech DPS5020 v16 with 3 channels: V I P Supported configuration options: continuous: on, off limit_samples: 0 (current) limit_time: 0 (current) voltage: 0.000000 (current) voltage_target: 0.000000, 50.000000, 0.010000 current: 0.000000 (current) current_limit: 0.000000, 20.000000, 0.010000 enabled: on, off (current) regulation: CV (current) ovp_active: on, off (current) ovp_threshold: 51.000000 (current) ocp_active: on, off (current) ocp_threshold: 20.200000 (current)
import sigrokWrapper
print( f"Create Sigork wrapper driver using driverString='{driverString}'")
dps_drv = sigrokWrapper.SigrokDriver(driver=driverString)
with dps_drv as dev:
status = dev.get('enabled')
print( f"dev {dev} enabled {status}")
time.sleep(1)
dev.set( "enabled", not status )
print( f"dev {dev} enabled {dev.get('enabled')}")
Create Sigork wrapper driver using driverString='rdtech-dps:conn=/dev/ttyUSB0' dev RDTech, model :DPS5020, version: v16 - 3 channels: V, I, P enabled True dev RDTech, model :DPS5020, version: v16 - 3 channels: V, I, P enabled False
Define function ramp_up
, which steps up voltge from 0V to 5V in 1/STEP V increments.
import sigrokWrapper
import time
from datetime import datetime
def ramp_up( drv, TARGET=5, STEP=5 ):
"""
Ramp up voltage from 0 to TARGET with number of STEPs
:drv: sigrok
:TARGET: target voltage
:STEP: time step 1/STEP s, voltage step TARGET/STEP V
"""
# Ramp up voltage
with dps_drv as dev:
# start from 0V
dev.set( "enabled", False)
dev.set( "voltage_target", 0.0)
# let it settle
time.sleep(1)
dev.set( "enabled", True)
for v in range (0,TARGET*STEP+1):
voltage = v/STEP
print( f"{datetime.now()}: voltage={voltage}")
dev.set( "voltage_target", voltage)
time.sleep(1/STEP)
# Close output
dev.set( "enabled", False)
dev.set( "voltage_target", 0.0)
Call the function ramp_up
function defined above to create voltage
ramp 0-5V.
print( f"Using driverString='{driverString}'")
ramp_up(sigrokWrapper.SigrokDriver(driver=driverString))
Using driverString='rdtech-dps:conn=/dev/ttyUSB0' 2023-03-07 13:56:33.455057: voltage=0.0 2023-03-07 13:56:34.119487: voltage=0.2 2023-03-07 13:56:34.583753: voltage=0.4 2023-03-07 13:56:35.048099: voltage=0.6 2023-03-07 13:56:35.512535: voltage=0.8 2023-03-07 13:56:35.976632: voltage=1.0 2023-03-07 13:56:36.440767: voltage=1.2 2023-03-07 13:56:36.904845: voltage=1.4 2023-03-07 13:56:37.369555: voltage=1.6 2023-03-07 13:56:37.833419: voltage=1.8 2023-03-07 13:56:38.297698: voltage=2.0 2023-03-07 13:56:38.762092: voltage=2.2 2023-03-07 13:56:39.226353: voltage=2.4 2023-03-07 13:56:39.690723: voltage=2.6 2023-03-07 13:56:40.155514: voltage=2.8 2023-03-07 13:56:40.619328: voltage=3.0 2023-03-07 13:56:41.083750: voltage=3.2 2023-03-07 13:56:41.547862: voltage=3.4 2023-03-07 13:56:42.012490: voltage=3.6 2023-03-07 13:56:42.476458: voltage=3.8 2023-03-07 13:56:42.940868: voltage=4.0 2023-03-07 13:56:43.405043: voltage=4.2 2023-03-07 13:56:43.869375: voltage=4.4 2023-03-07 13:56:44.333244: voltage=4.6 2023-03-07 13:56:44.797901: voltage=4.8 2023-03-07 13:56:45.262222: voltage=5.0
Wrap function ramp_up
into a Python thread to ramp up voltage from
0..3V with 0.25V (1/4) increments.
from threading import Thread
dps_driver=sigrokWrapper.SigrokDriver(driver=driverString)
print( f"{datetime.now()} - main create thread t1")
t1 = Thread(target=ramp_up, kwargs={'drv':dps_driver, 'TARGET':3, 'STEP':4})
print( f"{datetime.now()} - main start thread t1")
t1.start()
print( f"{datetime.now()} - main join thread t1")
t1.join()
print( f"{datetime.now()} - main return from join thread t1")
2023-03-07 13:54:05.610069 - main create thread t1 2023-03-07 13:54:05.610221 - main start thread t1 2023-03-07 13:54:05.610574 - main join thread t1 2023-03-07 13:54:08.375299: voltage=0.0 2023-03-07 13:54:09.089735: voltage=0.25 2023-03-07 13:54:09.759203: voltage=0.5 2023-03-07 13:54:10.473189: voltage=0.75 2023-03-07 13:54:11.142815: voltage=1.0 2023-03-07 13:54:11.856900: voltage=1.25 2023-03-07 13:54:12.526750: voltage=1.5 2023-03-07 13:54:13.240729: voltage=1.75 2023-03-07 13:54:13.909973: voltage=2.0 2023-03-07 13:54:14.623952: voltage=2.25 2023-03-07 13:54:15.293809: voltage=2.5 2023-03-07 13:54:16.007700: voltage=2.75 2023-03-07 13:54:16.677083: voltage=3.0 2023-03-07 13:54:18.740140 - main return from join thread t1
THIS DOES NOT WORK - SEEMS TO BE DROPPING PACKAGES IN CALLBACK
because driver reports error messages (ref chapter ‘Run two threads and collect data’ below)
sr: rigol-ds: Read should have been completed
Documentation of from Rigol DS1104Z oscilloscope using sigrokWrapper code presented above:
: Create sigork wrapper using driverString='rigol-ds:conn=tcp-raw/192.168.120.56/5555' : Device Rigol, model :DS1104Z, version: 00.04.04.SP3 - 4 channels: CH1, CH2, CH3, CH4 : config_keys=['LIMIT_FRAMES', 'DATA_SOURCE', 'TRIGGER_LEVEL', 'NUM_HDIV', 'TIMEBASE', 'HORIZ_TRIGGERPOS', 'TRIGGER_SOURCE', 'TRIGGER_SLOPE', 'SAMPLERATE'] : Channel-groups=['CH1', 'CH2', 'CH3', 'CH4'] : Channel -group CH1: config_keys=['PROBE_FACTOR', 'NUM_VDIV', 'COUPLING', 'VDIV'] : Channel -group CH2: config_keys=['PROBE_FACTOR', 'NUM_VDIV', 'COUPLING', 'VDIV'] : Channel -group CH3: config_keys=['PROBE_FACTOR', 'NUM_VDIV', 'COUPLING', 'VDIV'] : Channel -group CH4: config_keys=['PROBE_FACTOR', 'NUM_VDIV', 'COUPLING', 'VDIV'] : Channels=['CH1', 'CH2', 'CH3', 'CH4']
Define a function, which collects measurements from Rigol oscilloscope.
def run_measurements(drv, callback):
"""
:drv: sigrokWrapper driver object
:callback: function to call to collect measurements
"""
logging.info( f"{datetime.now()} - run_measurements - called callback={callback}")
with drv as dev:
logging.info( f"{datetime.now()} - run_measurements - add {dev}")
sigrokWrapper.SigrokDriver.session_add_device(dev)
logging.info( f"{datetime.now()} - run_measurements - session_start")
started = sigrokWrapper.SigrokDriver.session_start(callback)
logging.info( f"{datetime.now()} - run_measurements - enter session_run")
runned = sigrokWrapper.SigrokDriver.session_run()
logging.info( f"{datetime.now()} - run_measurements - returned session_run")
# sigrokWrapper.Sigrok.session_stop()
from threading import Thread
import time
from datetime import datetime
# Start logging
import logging
# logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.DEBUG)
logging.info( f"{datetime.now()} - main - started")
# Driver strings
RIGOL="rigol-ds:conn=tcp-raw/192.168.120.56/5555"
DPS="rdtech-dps:conn=/dev/ttyUSB0"
import sigrokWrapper
dps_drv = sigrokWrapper.SigrokDriver(driver=DPS)
skooppi_drv = sigrokWrapper.SigrokDriver(driver=RIGOL)
# Collect data
data = {}
# Callback function
def datafeed_in( device, packet ):
logging.info( f"{datetime.now()} - datafeed_in - device={device.model}, packet type {packet.type} ")
# print( f"{datetime.now()} - datafeed_in - device={device.model}, packet type {packet.type} ")
sigrokWrapper.SigrokDriver.parsePacketData(packet=packet, data=data)
logging.debug( f"{datetime.now()} - datafeed_in - { [ k + ':' + str(type(v)) for k,v in data.items()] }")
# setup scope
with skooppi_drv as skooppi_dev:
# skooppi_dev.set( "probe_factor", 10, channel_group="CH1")
# logging.info( f"{datetime.now()} - run_measurements - before samplerate={skooppi_dev.get('samplerate')}")
# skooppi_dev.set( "data_source", 'Memory')
skooppi_dev.set( "data_source", 'Live')
logging.info( f"{datetime.now()} - main - {skooppi_dev.get('data_source')}")
# Not supported
# skooppi_dev.set( "samplerate", 10)
# logging.info( f"{datetime.now()} - run_measurements after samplerate={skooppi_dev.get('samplerate')}")
pass
# Create session object
logging.info( f"{datetime.now()} - run_measurements - session_create")
sigrokWrapper.SigrokDriver.session_create()
# Define threds
t_drive = Thread(target=ramp_up, kwargs={'drv':dps_driver, 'TARGET':3, 'STEP':4})
t_measure = Thread(target=run_measurements, kwargs={'drv':skooppi_drv, 'callback': datafeed_in})
# Start threds
t_measure.start()
t_drive.start()
# Wait for driving thread (=dps ramp up) to finish
logging.info( f"{datetime.now()} - main - before join t_drive")
t_drive.join()
logging.info( f"{datetime.now()} - main - after join t_drive")
# Stop session = stops session in 't_measure' to quit
sigrokWrapper.SigrokDriver.session_stop()
logging.info( f"{datetime.now()} - main - before join t_measure")
t_measure.join()
logging.info( f"{datetime.now()} - main - after join t_measure")
# Cleanup
sigrokWrapper.SigrokDriver.session_close()
print( f"{datetime.now()} - datafeed_in - { [ k + ':' + str(len(v)) for k,v in data.items()] }")
# print( f"{datetime.now()} - datafeed_in - {data}")
2023-03-07 14:28:09.670026: voltage=0.0 2023-03-07 14:28:10.384463: voltage=0.25 2023-03-07 14:28:11.054365: voltage=0.5 2023-03-07 14:28:11.767942: voltage=0.75 2023-03-07 14:28:12.436947: voltage=1.0 2023-03-07 14:28:13.150911: voltage=1.25 2023-03-07 14:28:13.820645: voltage=1.5 2023-03-07 14:28:14.534535: voltage=1.75 2023-03-07 14:28:15.203756: voltage=2.0 sr: rigol-ds: Read should have been completed 2023-03-07 14:28:15.917670: voltage=2.25 2023-03-07 14:28:16.587132: voltage=2.5 2023-03-07 14:28:17.300769: voltage=2.75 2023-03-07 14:28:17.970699: voltage=3.0 2023-03-07 14:28:20.151616 - datafeed_in - ['CH1:1200']
Save data
-dictionary collected above to a CSV-file tmp/demo1.csv
.
import pandas as pd
CSV_FILE="tmp/demo1.csv"
df = pd.DataFrame.from_dict(data)
df.to_csv(CSV_FILE)
Read data frame from CSV-file pics/demo1.csv
and create a line plot:
import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv( "pics/demo1.csv", decimal=".")
df.plot( y="CH1")
plt.savefig( "pics/demo1.png", format="png" ) # , dpi=fig.dpi )
"pics/demo1.png"
Python code in sigrokWrapper.py. Put this file into one of the
directies named in sys.path
-list (e.g into current working
directory) and use
import sigrokWrapper
"""Wrap sigrok python API
* sigrok-py hangs Python session if sr.Context created more than
once --> use class leven persistent state 'context'
* sigrok-py complains if device open/close state not managet
correctly && make it easier to make several actions on device -->
add support for python with statement (__enter__, __exit__ -methods)
* key-names must be converted to sigrok internal representation -->
hide this mapping
* wrap error in exceptions to document error context (and to help
user to solve the problem)
* add support for some Python constructs:
* manage device open/close state to support Python with -statement
* drivers/device documentation string
* provide utilites for parsing datacquisition callback (WORK IN PROGRESS)
* expose Sigrok Python API (i.e. no need to use import sigrok.core as sr)
"""
import sigrok.core as sr
import sys
import time
from datetime import datetime
import logging
class Device:
"""Responsibilities:
1) Manage device 'open' state
2) Map keyName to internal key-id in getter&setter
3) Get and set functions
4) __str__ support for the device
"""
Constructor accepts sigrok device instance (in closed state):
# ------------------------------------------------------------------
# Constructor
def __init__( self, device):
self.device = device # sigrok device
self.isOpen = False # we are managing device state, initially 'Closed'
Methods to document device properties. Properties can be set on device or on channel_groups.
# Warning 'key.name' returned cannot be used for a keyName
# use sigrok-cli to find key names. For exampe, for the demo device
#
# sigrok-cli --driver=demo --show
def config_keys( self, channel_group=None ):
if not channel_group is None:
try:
configObject = self.device.channel_groups[channel_group]
except KeyError as err:
raise KeyError(f"Invalid channel_group '{channel_group}'. Expect one of {[ k for k,v in self.device.channel_groups.items()]}") from err
else:
configObject = self.device
config_key_names = [ key.name for key in configObject.config_keys() ]
return config_key_names
def channel_groups( self ):
channel_group_names = [ gr for gr in self.device.channel_groups ]
return channel_group_names
def channels( self ):
channel_names = [ ch.name for ch in self.device.channels ]
return channel_names
Get and set methods dispatching actions on device or on channel group.
# Device getters and setters
def get(self, keyName, channel_group=None):
"""
:channel_group: access 'keyName' on channel_group if given
"""
logging.info( f"get: keyName={keyName}, channel_group={channel_group}")
if self.isOpen:
if not channel_group is None:
try:
channel_group_obj = self.device.channel_groups[channel_group]
except KeyError as err:
raise KeyError(f"Invalid channel_group '{channel_group}'. Expect one of {[ k for k,v in self.device.channel_groups.items()]}") from err
return self.getConfigObject( keyName=keyName, configObject=channel_group_obj)
else:
return self.getConfigObject( keyName=keyName, configObject=self.device)
else:
print( f"Device {self} not open", file=sys.stderr)
return None
def set(self, keyName, value, channel_group=None):
"""Set 'keyName' to 'value' on device or on 'channel_group'
:channel_group: access 'keyName' on channel_group if given
"""
logging.info( f"set: keyName={keyName}, value={value}, channel_group={channel_group}")
if self.isOpen:
if not channel_group is None:
try:
channel_group_obj = self.device.channel_groups[channel_group]
except KeyError as err:
raise KeyError(f"Invalid channel_group '{channel_group}'. Expect one of {[ k for k,v in self.device.channel_groups.items()]}") from err
return self.setConfigObject( keyName=keyName, value=value, configObject=channel_group_obj)
else:
return self.setConfigObject( keyName=keyName, value=value, configObject=self.device)
else:
print( f"Device {self} not open - nothin done", file=sys.stderr)
return None
Methods acting on device or on channel group.
def getConfigObject(self, keyName, configObject):
"""
:configObject: sigrok.Configurable (i.e. Device, Channel_Group etc)
"""
try:
key = self.keyName2key(keyName)
except ValueError as err:
valid_key_names = [ key.name for key in configObject.config_keys() ]
raise ValueError( f"Invalid key '{keyName}'. Expect one of: {valid_key_names}") from err
try:
value = configObject.config_get(key)
except ValueError as err:
valid_key_names = [ key.name for key in configObject.config_keys() ]
raise ValueError( f"Error reading '{keyName}'. Expect one of: {valid_key_names}") from err
return value
def setConfigObject(self, keyName, value, configObject):
try:
key = self.keyName2key(keyName)
except ValueError as err:
valid_key_names = [ key.name for key in configObject.config_keys() ]
raise ValueError( f"Invalid key '{keyName}'. Expect one of: {valid_key_names}") from err
try:
# print( f"settign {value}[{type(value)}]")
ret = configObject.config_set(key,value)
except Exception as err:
capabilities = configObject.config_capabilities(key)
print( f"{keyName}[{key}], capabilities={capabilities}, listable: {sr.Capability.LIST in capabilities:}")
try:
# Not possible to peek for value
if sr.Capability.LIST in capabilities:
valid_values = f" Valid values={ configObject.config_list(key) }"
# valid_values = f" Valid values={ [str(v)+type(v) for v in configObject.config_list(key)]}"
else:
valid_values = ""
except:
valid_values = "exception in valid values"
raise Exception( f"Error in setting '{keyName}' to value {value}[{type(value)}].{valid_values}") from err
return ret
Map keyName
to sigrok identifier.
@staticmethod
def keyName2key(keyName):
"""Map 'keyName' to key sigrok identifier"""
key = sr.ConfigKey.get_by_identifier(keyName)
return key
# ------------------------------------------------------------------
# Open close
def open(self):
if not self.isOpen:
self.isOpen = True
self.device.open()
return self
def close(self):
if self.isOpen:
self.isOpen = False
self.device.close()
# Device information string
def __str__(self):
return f"{self.device.vendor}, model :{self.device.model}, version: {self.device.version} - {len(self.device.channels)} channels: {', '.join([c.name for c in self.device.channels])}"
class SigrokDriver:
"""Wrap sigrok python API
- find device for driver string
- manage device open/close state = support Python with statement
- manage context (class attribute)
- manage session (class attribute)
- session interface (open, start, run, close, clenaup)
- utilities to parse session run callback
Attributes:
* 'driver': sigrok.Driver object
"""
single context state to freezing Python REPL
# Persisent, shared context - one instance to avoid python REPL
# (Read-Eval-Print Loop) from freezing
context = None
# One context managed
session = None
# Contstructore
def __init__( self, driver="rdtech-dps"):
"""Create 'sigrok.context', locate 'driver' (default rdtechDps')
from this context, find device from from
:driver: sigrok driver string used in sigrok-cli
e.g. 'rdtech-dps:conn=/dev/ttyUSB0'
"""
# Create only one 'context' instance in one python session
if SigrokDriver.context is None:
SigrokDriver.context = sr.Context.create()
#
driver_spec = driver.split(':')
driver_name = driver_spec[0]
# locate driver bundle into libsigrok
if driver_name not in self.context.drivers:
raise KeyError( f"Unknown driver name '{driver_name}' in '{driver}'. Supported hardware drivers: {','.join(self.context.drivers.keys())}")
self.driver = self.context.drivers[driver_name]
driver_options = {}
for pair in driver_spec[1:]:
name, value = pair.split('=')
# key = self.driver.ConfigKey.get_by_identifier(name)
# driver_options[name] = key.parse_string(value)
driver_options[name] = value
# attach to sigrok.Device wrapped within Device class
self.device = self.findDevice(self.driver, driver_options)
Scan for first device found
def findDevice(self, driver, driver_options):
"""Locate first (=the one and only?) device found using
driver_options. Stderr message if not found.
:driver: Driver for the device we are looking for
:driver_options: Hash map for drivers options used to locate the
device
:return: Device wrapper for the first sigrok device scanned,
None if not found
"""
scanned = driver.scan(**driver_options)
if len(scanned) > 0:
# return first device found
return Device(scanned[0])
else:
raise ValueError( f"Could not find any device for driver '{self.driver.name}' with options '{ ','.join([k+'='+v for k,v in driver_options.items()])}'")
to open/close device within python with statements. For example:
with dps as dev: # Enable dev.voltage_target = 3.3 dev.enabled = True
# ------------------------------------------------------------------
# support with statement
def open(self):
if self.device is not None:
self.device.open()
return self.device
def close(self):
if self.device is not None:
self.device.close()
def __enter__(self):
"""Called when entering with -statement.
:return: device (which is opened)
"""
return( self.open())
def __exit__( self, *args):
"""Called when exiting with -statement. Close 'device' (if it open)
:return: device (which is opened)
"""
self.close()
# ------------------------------------------------------------------
# support with statement
@staticmethod
def session_create():
if SigrokDriver.session is None:
logging.info( "session created")
SigrokDriver.session = SigrokDriver.context.create_session()
return SigrokDriver.session
@staticmethod
def session_get():
if SigrokDriver.session is None:
raise ValueError( "session_get: Session missing - should have called 'session_create'")
return SigrokDriver.session
@staticmethod
def session_add_device( device ):
session = SigrokDriver.session_get()
session.add_device(device.device)
logging.info( f"added device {device.device}[{type(device.device)}] to session ")
@staticmethod
def session_start( fRun, fStop=lambda device,frame: print( "Stopped" ) ):
session = SigrokDriver.session_get()
logging.debug( f"session_start: called: is_running={session.is_running()}")
if session.is_running():
raise ValueError( f"Session was already running - not started")
# def datafeed_in(device, packet):
# logging.info( f"datafeed_in: packet type {packet.type} ")
# logging.debug( f"datafeed_in: payload methods {dir(packet.payload)} ")
# # print( f"device:{device.name}")
# session.append( device, packet)
# session.begin_save(outputFile)
session.add_datafeed_callback(fRun)
logging.debug( f"session_start: before start is_running={session.is_running()}" )
session.start()
logging.debug( f"session_start: after start is_running={session.is_running()}" )
@staticmethod
def session_run():
""":return: False is already running, else True
"""
session = SigrokDriver.session_get()
logging.debug( f"session_run: called session is_running {session.is_running()}" )
# if session.is_running():
# logging.info( f"session_run: session already running {session.is_running()} - nothing done" )
# return False
logging.info( f"session_run: before session.run, is_running {session.is_running()}" )
session.run()
logging.info( f"session_run: after session.run, is_running {session.is_running()}" )
return True
@staticmethod
def session_stop():
session = SigrokDriver.session_get()
session.stop()
# SigrokDriver.session = None
@staticmethod
def session_close():
SigrokDriver.session = None
# ------------------------------------------------------------------
# utilities to process callback data
@classmethod
def isAnalogPacket(cls, packet):
return packet.type == sr.PacketType.ANALOG
@classmethod
def isLogicPacket(cls, packet):
return packet.type == sr.PacketType.LOGIC
@classmethod
def packetChannels(cls, packet):
return [ch.name for ch in packet.payload._channels() ]
@classmethod
def packetChannels(cls, packet):
return [ch.name for ch in packet.payload._channels() ]
@classmethod
def parsePacketData(cls, packet, data):
logging.info( f"parsePacketData: packet.type={packet.type}")
if cls.isAnalogPacket(packet):
for i, channel in enumerate(cls.packetChannels(packet)):
# previosly unseen channel?
if channel not in data: data[channel] = []
logging.info( f"parsePacketData: channel {channel} {len(packet.payload.data[i])}")
data[channel].extend(packet.payload.data[i])
# data[channel].append(packet.payload.data[i][-1])
elif cls.isLogicPacket(packet):
channel = "logic"
# if channel not in data: data[channel] = []
# logging.info( f"parsePacketData: channel {channel} {len(packet.payload.data)}")
# data[channel].extend(packet.payload.data)
# TODO: collect also logic data (sepately?)
return data
from device information:
# ------------------------------------------------------------------
# print out
def __str__(self):
return str(self.device)
- Sigrok API documentation
- Python API documentation: http://sigrok.org/api/libsigrok/0.5.0/bindings/python/index.html
- C++ API documentation http://sigrok.org/api/libsigrok/0.5.0/bindings/cxx/index.html
- Unofficial re-implemntation of the sigrok-cli tool https://github.com/martinling/sigrok-cli-python
- Sigrok meter (a project using Sigrok Python API): https://github.com/sigrokproject/sigrok-meter
- Module wrapping Sigrok Python API: https://github.com/sigrokproject/sigrok-meter/blob/master/acquisition.py
#
#
Modified:2022-09-23.08:38; # time-stamp