-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First of many steps towards refactoring the anytone.py driver... #638
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,269 @@ | ||
# Copyright 2023 Dan Smith <[email protected]> | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
from builtins import bytes | ||
import struct | ||
import logging | ||
from abc import ABC, abstractmethod | ||
|
||
from chirp import util, errors | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
||
# ############################################################################ | ||
# Radio Communication Protocol | ||
# ############################################################################ | ||
|
||
# TODO Consider lifting this to common for all drivers to utilize | ||
class RadioCommunicationProtocol(ABC): | ||
"""Abstract base class for encapsulating the logic that communicates with | ||
the radio -- while remaining agnositic as to what content is being | ||
communicated. | ||
|
||
Usage example (within the download code, similar for upload): | ||
with MyProtocol(pipe, block_size, file_ident) as protocol: | ||
# `protocol.start_session()` is automatically called. | ||
# `protocol.verify_radio_ident()` is automatically called. | ||
|
||
for ... | ||
for ... | ||
block = protocol.read_block(addr) | ||
data += block | ||
|
||
# `protocol.end_session()` is automatically called. | ||
|
||
""" | ||
|
||
def __init__( | ||
self, pipe, block_size: int, file_ident, echos_write=True | ||
) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I've mentioned this on the list, but surely many times in other PRs. I absolutely detest black formatting style (as do many others). I'm sorry you've done this work in this way and will have to reformat it, but that's how it is. I have to deal with it at work, but I get to choose here. These parens on their own lines are one of the ugliest aspects of it, IMHO. Most of the rest of the chirp code is fairly conventional python formatting. Please reformat this to look like the rest of the codebase. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If not Black, then which tool should I use? Any special settings? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't offer you a tool to automatically format the code in more conventional python and chirp style, as you know. You can start by writing code that the current checker (flake8) will allow, and not adding arbitrary vertical stacking of things that will fit perfectly fine on fewer lines. Anywhere there's a closing paren on a line by itself is an easy regex for not-okay-ness :) |
||
"""@file_ident can be a single ID or a list of multiple possible | ||
IDs.""" | ||
assert block_size > 0 and block_size % 2 == 0 | ||
self.pipe = pipe | ||
self.block_size = block_size | ||
self.file_idents = ( | ||
[file_ident] | ||
if isinstance(file_ident, (bytes, str)) | ||
else file_ident | ||
) | ||
self.echos_write = echos_write | ||
|
||
def __enter__(self): | ||
self.start_session() | ||
self._verify_radio_ident() | ||
return self # what gets assigned to the `as` variable | ||
|
||
def __exit__(self, exc_type, exc_val, exc_tb): | ||
# This is called automatically when the `with` statement ends. | ||
# @exc_type,@exc_val, and @exc_tb are the type, value, and traceback | ||
# info, respectively, if an exception was raised within the body of | ||
# the with statement. Otherwise, all three are None. | ||
if exc_type is None: | ||
self.end_session() | ||
return False # do not supress the exception, if any | ||
|
||
def read_bytes(self, length): | ||
"""Reads the @length number of bytes from the pipe and returns the | ||
result. Raises errors.RadioError if the read is unsuccessful, or if | ||
the length of the result falls short of @length.""" | ||
try: | ||
data = self.pipe.read(length) | ||
except Exception as e: | ||
LOG.error(f"Error reading from radio: {e}") | ||
raise errors.RadioError("Unable to read from radio") from e | ||
|
||
if len(data) != length: | ||
LOG.error( | ||
f"Short read from radio ({len(data)}, expected {length})" | ||
) | ||
LOG.debug(util.hexprint(data)) | ||
raise errors.RadioError("Short read from radio") | ||
assert type(data) == bytes | ||
return data | ||
|
||
def write_bytes(self, data): | ||
"""Writes @data to the pipe, then advances the pipe cursor to the end | ||
of what was just written. Raises errors.RadioError if the write is | ||
unsuccessful.""" | ||
try: | ||
self.pipe.write(data) | ||
if self.echos_write: | ||
echoed = self.pipe.read(len(data)) | ||
assert echoed == data | ||
except Exception as e: | ||
LOG.error(f"Error writing to radio: {e}") | ||
raise errors.RadioError("Unable to write to radio") from e | ||
|
||
def _verify_radio_ident(self): | ||
file_ident = self.inquire_model_number() | ||
if file_ident not in self.file_idents: | ||
LOG.debug( | ||
f"Model inquiry response was: {util.hexprint(file_ident)}" | ||
) | ||
raise errors.RadioError( | ||
f"Unsupported model for this driver: {str(file_ident)}" | ||
) | ||
|
||
@abstractmethod | ||
def start_session(self): | ||
"""Implement this method with whatever handshake is required to begin | ||
a programming session with the radio. This method will be automatically | ||
called by the `with` statement before executing the code in the body. | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def end_session(self): | ||
"""Implement this method with whatever command is required to end | ||
a programming session with the radio. This method will be automatically | ||
called by the `with` statement after executing the code in the body | ||
(assuming no uncaught exception is raised). | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def inquire_model_number(self) -> bytes: | ||
"""Implement this method with whatever code is needed to ask the radio | ||
for its model number.""" | ||
pass | ||
|
||
|
||
class AnytoneProtocol(RadioCommunicationProtocol): | ||
"""This class encapsulates the logic for communicating with any radio that | ||
uses the Anytone protocol, while remaining agnositic as to what content | ||
is actually being communicated (a data block is just a data block).""" | ||
|
||
ACK = b"\x06" | ||
CMD_BEGIN_PROGRAMMING_SESSION = b"PROGRAM" | ||
CMD_INQUIRE_MODEL = b"\x02" | ||
CMD_END_SESSION = b"\x45\x4E\x44" # aka end frame | ||
CMD_READ = b"R" | ||
CMD_WRITE = b"W" | ||
ACK_CLEAN_START = b"QX\x06" | ||
MODEL_NUMBER_FIELD_LEN = 16 # Including the version info | ||
MODEL_NUMBER_LEN = 7 # The model number itself is 7 | ||
# > = big-endian, c = char, H = unsigned short, b = signed char | ||
FRAME_HEADER_FORMAT = b">cHb" | ||
FRAME_FOOTER_FORMAT = b"BB" | ||
|
||
@classmethod | ||
def checksum(cls, data): | ||
"""Anytone's checksum algorithm.""" | ||
return sum(data) % 256 | ||
|
||
def start_session(self): | ||
self.pipe.timeout = 1 | ||
response = self._send_simple_command( | ||
self.CMD_BEGIN_PROGRAMMING_SESSION, len(self.ACK_CLEAN_START) | ||
) | ||
if response != self.ACK_CLEAN_START: | ||
LOG.debug( | ||
"Start of programming session response was: " | ||
f"{util.hexprint(response)}, expected: {self.ACK_CLEAN_START}" | ||
) | ||
raise errors.RadioError("Unsupported model or bad connection") | ||
|
||
def inquire_model_number(self) -> bytes: | ||
response = self._send_simple_command( | ||
self.CMD_INQUIRE_MODEL, self.MODEL_NUMBER_FIELD_LEN | ||
) | ||
file_ident: bytes = response[1 : self.MODEL_NUMBER_LEN + 1] | ||
file_ident.strip(b"\x00") | ||
return file_ident | ||
|
||
def end_session(self): | ||
result = self._send_simple_command(self.CMD_END_SESSION, 1) | ||
# FIXME I'm baffled as to why the radio sometimes returns \x06 as | ||
# it's supposed to but usually returns \x00 | ||
if result not in [self.ACK, b"\x00"]: | ||
LOG.debug(f"End session response:\n{util.hexprint(result)}") | ||
raise errors.RadioError("Radio did not finish cleanly.") | ||
|
||
def _send_simple_command(self, cmd, response_length) -> bytes: | ||
self.write_bytes(cmd) | ||
response = self.read_bytes(response_length) | ||
LOG.debug( | ||
f"Cmd: {util.hexprint(cmd)}, " | ||
f"Response:\n{util.hexprint(response)}" | ||
) | ||
return response | ||
|
||
def _send_frame_command(self, cmd, addr, length, data=None) -> bytes: | ||
"""Reads or writes a frame of data to the radio and then returns the | ||
response -- either the data that's read, or a simple acknowledgment in | ||
the case of a write.""" | ||
frame = struct.pack(self.FRAME_HEADER_FORMAT, cmd, addr, length) | ||
if cmd == self.CMD_WRITE: | ||
frame += data | ||
frame += struct.pack( | ||
self.FRAME_FOOTER_FORMAT, self.checksum(frame[1:]), self.ACK | ||
) | ||
self.write_bytes(frame) | ||
LOG.debug(f"Sent Frame:\n{util.hexprint(frame)}") | ||
return ( | ||
self.read_bytes(1) | ||
if cmd == self.CMD_WRITE | ||
else self.read_bytes(length + 6) | ||
) | ||
|
||
def read_block(self, addr, out_of=None) -> bytes: | ||
"""Asks the radio to return one block's worth of data found at | ||
@addr. @out_of is the number of blocks total (optional; only used in | ||
debug massages)""" | ||
result = self._send_frame_command(self.CMD_READ, addr, self.block_size) | ||
out_of_part = f" of {out_of:4x}" if out_of else "" | ||
LOG.debug( | ||
f"Frame @{addr:4x} {out_of_part}...\n{util.hexprint(result)}" | ||
) | ||
header = result[:4] | ||
data = result[4:-2] | ||
# The following colon insures that a bytes type is returned (via an | ||
# iterable) rather than an int | ||
ack = result[-1:] | ||
|
||
if ack != self.ACK: | ||
LOG.debug(f"Expected ACK, got: {repr(ack)}") | ||
raise errors.RadioError("Radio NAK'd block at %04x" % addr) | ||
_cmd, _addr, _length = struct.unpack(self.FRAME_HEADER_FORMAT, header) | ||
if _addr != addr or _length != self.block_size: | ||
LOG.debug( | ||
"Block read error, Expected length %02x, but received %02x" | ||
% (self.block_size, _length) | ||
) | ||
LOG.debug( | ||
"Block read error, Expected addr %04x, but received %04x" | ||
% (addr, _addr) | ||
) | ||
raise errors.RadioError("Radio sent an unexpected data block.") | ||
cs = self.checksum(header[1:] + data) | ||
if cs != result[-2]: | ||
LOG.debug("Calculated checksum: %02x" % cs) | ||
LOG.debug("Actual checksum: %02x" % result[-2]) | ||
raise errors.RadioError("Block at 0x%04x failed checksum" % addr) | ||
return data | ||
|
||
def write_block(self, addr, data): | ||
"""Sends @data to the radio with the instruction to write it to @addr. | ||
@data is expected to be exactly one block's worth.""" | ||
result = self._send_frame_command( | ||
self.CMD_WRITE, addr, self.block_size, data | ||
) | ||
if result != self.ACK: | ||
LOG.debug(f"write_block() expected ACK, got: {repr(result)}") | ||
raise errors.RadioError( | ||
"Radio did not accept block at %04x" % addr | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
from builtins import bytes | ||
import unittest | ||
import io | ||
|
||
from chirp import errors | ||
import chirp.drivers.anytone_clone as ac | ||
|
||
MODEL_VERSION = b"Ianytone" + b"\x00" * (16 - 8) | ||
SIXTEEN_BYTES = ( | ||
b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15" | ||
) | ||
THIRTYTWO_BYTES = ( | ||
b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x11\x12\x13\x14\x15" | ||
b"\x16\x17\x18\x19\x20\x21\x22\x23\x24\x25\x26\x27\x28\x29\x30\x31" | ||
) | ||
CHECKSUM_16_BYTES = b"\x9C" | ||
CHECKSUM_32_BYTES = b"\xE8" | ||
|
||
|
||
class FakeProtocol(ac.RadioCommunicationProtocol): | ||
def __init__( | ||
self, pipe=None, block_size: int = 16, file_ident=b"faker" | ||
) -> None: | ||
super().__init__(pipe, block_size, file_ident, echos_write=False) | ||
if not pipe: | ||
pipe = io.BytesIO(b"fake data") | ||
|
||
def start_session(self): | ||
pass | ||
|
||
def end_session(self): | ||
pass | ||
|
||
def inquire_model_number(self) -> bytes: | ||
return b"faker" | ||
|
||
|
||
class TestRadioCommunicationProtocol(unittest.TestCase): | ||
"""Tests the communications protocol base class (radio-agnostic).""" | ||
|
||
def test_verify_radio_ident_single(self): | ||
proto = FakeProtocol() | ||
proto._verify_radio_ident() | ||
|
||
def test_verify_radio_ident_double(self): | ||
proto = FakeProtocol(file_ident=[b"faker", b"faker1"]) | ||
proto._verify_radio_ident() | ||
|
||
def test_verify_radio_ident_no_such(self): | ||
proto = FakeProtocol(file_ident=[b"faker1", b"faker2"]) | ||
with self.assertRaises( | ||
errors.RadioError, msg="Unsupported model for this driver: faker2" | ||
): | ||
proto._verify_radio_ident() | ||
|
||
def test_read_bytes(self): | ||
with FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) as proto: | ||
self.assertEquals(b"\x00\x01\x02\x03", proto.read_bytes(4)) | ||
self.assertEquals(b"\x04\x05", proto.read_bytes(2)) | ||
|
||
def test_read_bytes_short(self): | ||
with FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) as proto: | ||
with self.assertRaises( | ||
errors.RadioError, msg="Short read from radio" | ||
): | ||
proto.read_bytes(33) | ||
|
||
def test_write_bytes(self): | ||
proto = FakeProtocol(pipe=io.BytesIO(THIRTYTWO_BYTES)) | ||
self.assertEquals(b"\x00\x01\x02\x03", proto.read_bytes(4)) | ||
|
||
# overwrites the 5th and 6th bytes and advances the cursor | ||
proto.write_bytes(b"\xFF\xFF") | ||
|
||
self.assertEquals(b"\x06", proto.read_bytes(1)) | ||
|
||
|
||
class TestAnytoneProtocol(unittest.TestCase): | ||
"""Tests the non-radio-specific aspects of the communications protocol | ||
base class.""" | ||
|
||
def test_checksum(self): | ||
self.assertEquals(156, ac.AnytoneProtocol.checksum(SIXTEEN_BYTES)) | ||
self.assertEquals(200, ac.AnytoneProtocol.checksum(THIRTYTWO_BYTES)) | ||
|
||
def test_read_one_frame(self): | ||
CHECKSUM_BLOCK_1 = bytes( | ||
(ac.AnytoneProtocol.checksum(b"\x00\x00\x10" + SIXTEEN_BYTES),) | ||
) | ||
|
||
SESSION = ( | ||
# space to write the start command | ||
ac.AnytoneProtocol.CMD_BEGIN_PROGRAMMING_SESSION | ||
+ | ||
# read the ack | ||
ac.AnytoneProtocol.ACK_CLEAN_START | ||
+ | ||
# space to write the inquire command | ||
ac.AnytoneProtocol.CMD_INQUIRE_MODEL | ||
+ | ||
# read the model number response | ||
MODEL_VERSION | ||
+ | ||
# space to write the read-frame command | ||
b"R\x00\x00\x10" | ||
+ | ||
# read the frame: header, data, checksum, ack | ||
b"R\x00\x00\x10" | ||
+ SIXTEEN_BYTES | ||
+ CHECKSUM_BLOCK_1 | ||
+ ac.AnytoneProtocol.ACK | ||
+ | ||
# space to write the end command | ||
ac.AnytoneProtocol.CMD_END_SESSION | ||
+ | ||
# read the ack | ||
ac.AnytoneProtocol.ACK | ||
) | ||
|
||
with ac.AnytoneProtocol( | ||
pipe=io.BytesIO(SESSION), | ||
block_size=16, | ||
file_ident=b"anytone", | ||
echos_write=False, | ||
) as proto: | ||
result = proto.read_block(0) | ||
self.assertEquals(SIXTEEN_BYTES, result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "no new uses of future" is because you're using this compatibility layer, but shouldn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't realize. That just carried over form the previous code and I didn't even notice it. Thanks.