Skip to content

Commit

Permalink
[WIP] Imix support
Browse files Browse the repository at this point in the history
  • Loading branch information
lschuermann committed Jan 4, 2025
1 parent 6d5c3db commit 5aa8158
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 28 deletions.
1 change: 0 additions & 1 deletion hwci/boards/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@
# Copyright Tock Contributors 2024.

from .tockloader_board import TockloaderBoard
from .nrf52dk import Nrf52dk
from .mock_board import MockBoard
118 changes: 118 additions & 0 deletions hwci/boards/imix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Licensed under the Apache License, Version 2.0 or the MIT License.
# SPDX-License-Identifier: Apache-2.0 OR MIT
# Copyright Tock Contributors 2024.

import time
import os
import subprocess
import logging
from contextlib import contextmanager
import serial.tools.list_ports
from boards.tockloader_board import TockloaderBoard
from utils.serial_port import SerialPort
from gpio.gpio import GPIO
import yaml
import os
import traceback

class Imix(TockloaderBoard):
def __init__(self):
super().__init__()
self.arch = "cortex-m4"
self.kernel_path = os.path.join(
self.base_dir, "repos/tock")
self.kernel_board_path = os.path.join(
self.kernel_path, "boards/imix")
self.uart_port = self.get_uart_port()
self.uart_baudrate = self.get_uart_baudrate()
self.board = "imix"
self.program_method = "serial_bootloader"
self.serial = self.get_serial_port()
self.gpio = self.get_gpio_interface()
self.open_serial_during_flash = False
self.app_sha256_credential = True

def get_uart_port(self):
logging.info("Getting list of serial ports")
ports = list(serial.tools.list_ports.comports())
for port in ports:
if "imix IoT Module" in port.description:
logging.info(f"Found imix IoT programming port: {port.device}")
return port.device
if ports:
logging.info(f"Automatically selected port: {ports[0].device}")
return ports[0].device
else:
logging.error("No serial ports found")
raise Exception("No serial ports found")

def get_uart_baudrate(self):
return 115200 # Default baudrate for the board

def get_serial_port(self):
logging.info(
f"Using serial port: {self.uart_port} at baudrate {self.uart_baudrate}"
)
return SerialPort(self.uart_port, self.uart_baudrate, open_rts=False, open_dtr=True)

def get_gpio_interface(self):
return None

def cleanup(self):
if self.gpio:
for interface in self.gpio.gpio_interfaces.values():
interface.cleanup()
if self.serial:
self.serial.close()

def flash_kernel(self):
logging.info("Flashing the Tock OS kernel")
if not os.path.exists(self.kernel_path):
logging.error(f"Tock directory {self.kernel_path} not found")
raise FileNotFoundError(f"Tock directory {self.kernel_path} not found")

# Run make program from the board directory (this uses the Tock bootloader)
subprocess.run(
["make", "program"], cwd=self.kernel_board_path, check=True
)

def erase_board(self):
logging.info("Erasing the board")
# We erase all apps, but don't erase the kernel. Is there a simple way
# that we can prevent the installed kernel from starting (by
# overwriting its reset vector?)
subprocess.run(["tockloader", "erase-apps"], check=True)

def reset(self):
if self.serial.is_open():
logging.info("Performing a target reset by toggling RTS")
self.serial.set_rts(True)
time.sleep(0.1)
self.serial.set_rts(False)
else:
logging.info("Performing a target reset by reading address 0")
subprocess.run(["tockloader", "read", "0x0", "1"], check=True)

# The flash_app method is inherited from TockloaderBoard

@contextmanager
def change_directory(self, new_dir):
previous_dir = os.getcwd()
os.chdir(new_dir)
logging.info(f"Changed directory to: {os.getcwd()}")
try:
yield
finally:
os.chdir(previous_dir)
logging.info(f"Reverted to directory: {os.getcwd()}")


def load_target_spec():
# Assume the target spec file is in a fixed location
target_spec_path = os.path.join(os.getcwd(), "target_spec_imix.yaml")
with open(target_spec_path, "r") as f:
target_spec = yaml.safe_load(f)
return target_spec


board = Imix()
3 changes: 2 additions & 1 deletion hwci/boards/nrf52dk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ def __init__(self):
self.kernel_path, "boards/nordic/nrf52840dk")
self.uart_port = self.get_uart_port()
self.uart_baudrate = self.get_uart_baudrate()
self.program_method = "openocd"
self.openocd_board = "nrf52dk"
self.board = "nrf52dk"
self.serial = self.get_serial_port()
self.gpio = self.get_gpio_interface()

def get_uart_port(self):
logging.info("Getting list of serial ports")
logging.info("Getting list of serial ports!")
ports = list(serial.tools.list_ports.comports())
for port in ports:
if "J-Link" in port.description:
Expand Down
30 changes: 23 additions & 7 deletions hwci/boards/tockloader_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self):
self.board = None # Should be set in subclass
self.arch = None # Should be set in subclass
self.base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
self.program_method = "serial_bootloader"
self.app_sha256_credential = False

def flash_app(self, app):
if type(app) == str:
Expand All @@ -38,32 +40,46 @@ def flash_app(self, app):
logging.error(f"App directory {app_dir} not found")
raise FileNotFoundError(f"App directory {app_dir} not found")


make_args = [
"make",
f"TOCK_TARGETS={self.arch}"
]
# if self.app_sha256_credential:
# make_args.append("ELF2TAB_ARGS=\"--sha256\"")

# Build the app using absolute paths
logging.info(f"Building app: {app_name}")
if app_name != "lua-hello":
subprocess.run(
["make", f"TOCK_TARGETS={self.arch}"], cwd=app_dir, check=True
)
subprocess.run(make_args, cwd=app_dir, check=True)
else:
# if the app is lua-hello, we need to build the libtock-c submodule first so we need to change directory
# into the libtock-c directory so it knows we are in a git repostiory
self.change_directory(libtock_c_dir)
subprocess.run(
["make", f"TOCK_TARGETS={self.arch}"], cwd=app_dir, check=True
)
subprocess.run(make_args, cwd=app_dir, check=True)

tab_path = os.path.join(app_dir, tab_file)
if not os.path.exists(tab_path):
logging.error(f"Tab file {tab_path} not found")
raise FileNotFoundError(f"Tab file {tab_path} not found")

if self.program_method == "serial_bootloader":
program_method_arg = "--serial"
elif self.program_method == "jlink":
program_method_arg = "--jlink"
elif self.program_method == "openocd":
program_method_arg = "--openocd"
else:
raise NotImplemented(f"Unknown program method: {self.program_method}")

logging.info(f"Installing app: {app_name}")
subprocess.run(
[
"tockloader",
"install",
"--board",
self.board,
"--openocd",
program_method_arg,
tab_path,
],
check=True,
Expand Down
3 changes: 3 additions & 0 deletions hwci/target_spec_imix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
model: imix
serial_number: 0xfoobar
pin_mappings:
41 changes: 41 additions & 0 deletions hwci/tests/console_recv_short_and_long.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed under the Apache License, Version 2.0 OR the MIT License.
# SPDX-License-Identifier: Apache-2.0 OR MIT

import logging
from utils.test_helpers import OneshotTest
import re


class StackSizeTest01(OneshotTest):
def __init__(self):
super().__init__(apps=[
"tests/console/console_recv_short",
"tests/console/console_recv_long",
])

def oneshot_test(self, board):
serial = board.serial
return

# Wait for "Stack Test App"
output = serial.expect("Stack Test App", timeout=10)
if not output:
raise Exception("Did not receive 'Stack Test App' message")

# Wait for "Current stack pointer: 0x..."
output = serial.expect(r"Current stack pointer: 0x[0-9a-fA-F]+", timeout=5)
if not output:
raise Exception("Did not receive 'Current stack pointer' message")

# Optionally, you can extract and log the stack pointer value
match = re.search(r"Current stack pointer: (0x[0-9a-fA-F]+)", output.decode())
if match:
stack_pointer = match.group(1)
logging.info(f"Stack pointer is at {stack_pointer}")
else:
raise Exception("Failed to parse stack pointer value")

logging.info("Stack size test 01 completed successfully")


test = StackSizeTest01()
12 changes: 8 additions & 4 deletions hwci/tests/console_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ def oneshot_test(self, board):

# Wait for the application to initialize
logging.info("Waiting for the application to initialize...")
time.sleep(2) # Allow time for the app to start
time.sleep(1) # Allow time for the app to start

# Simulate user input by writing to the serial port
test_input = b"Hello, Tock!"
serial.write(test_input)
serial.flush_buffer()
test_input = b"Hello, Tock!\r\n"
for b in test_input:
time.sleep(0.01) # imix doesn't like this being sent too quickly!
serial.write(bytes([b]))
#serial.write(test_input)
logging.info(f"Sent test input: {test_input.decode('utf-8')}")
time.sleep(7) # Wait for the application to process

# Wait for the expected output from the application
logging.info("Waiting for the application to output the result...")
pattern = r"Userspace call to read console returned: (.*)"
output = serial.expect(pattern, timeout=10)
print(output)

if output:
received_line = output.decode("utf-8", errors="replace").strip()
Expand Down
69 changes: 55 additions & 14 deletions hwci/utils/serial_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,56 @@


class SerialPort:
def __init__(self, port, baudrate=115200):
def __init__(self, port, baudrate=115200, open_rts=None, open_dtr=None):
self.port = port
self.baudrate = baudrate
try:
self.ser = serial.Serial(port, baudrate=baudrate, timeout=1)
self.child = fdpexpect.fdspawn(self.ser.fileno())
logging.info(f"Opened serial port {port} at baudrate {baudrate}")
except serial.SerialException as e:
logging.error(f"Failed to open serial port {port}: {e}")
raise
self.open_rts = open_rts
self.open_dtr = open_dtr
self.ser = None
self.child = None

def open(self):
if self.ser is None:
try:
self.ser = serial.Serial(self.port, baudrate=self.baudrate, timeout=1, exclusive=True)
self.child = fdpexpect.fdspawn(self.ser.fileno())
if self.open_rts is not None:
self.ser.rts = self.open_rts
if self.open_dtr is not None:
self.ser.dtr = self.open_dtr
logging.info(f"Opened serial port {self.port} at baudrate {self.baudrate}")
except serial.SerialException as e:
logging.error(f"Failed to open serial port {port}: {e}")
raise

def is_open(self):
return self.ser is not None

def close(self):
if self.ser is not None:
self.ser.close()
logging.info(f"Closed serial port {self.port}")
self.ser = None
self.child = None

def set_rts(self, rts):
assert self.ser is not None, "Serial port is not open!"
self.ser.rts = rts

def set_dtr(self, dtr):
assert self.ser is not None, "Serial port is not open!"
self.ser.dtr = dtr

def flush_buffer(self):
assert self.ser is not None, "Serial port is not open!"

self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
logging.info("Flushed serial buffers")

def expect(self, pattern, timeout=10, timeout_error=True):
assert self.ser is not None, "Serial port is not open!"

try:
index = self.child.expect(pattern, timeout=timeout)
return self.child.after
Expand All @@ -45,26 +78,31 @@ def expect(self, pattern, timeout=10, timeout_error=True):
return None

def write(self, data):
assert self.ser is not None, "Serial port is not open!"

logging.debug(f"Writing data: {data}")
for byte in data:
self.ser.write(bytes([byte]))
time.sleep(0.1)

def close(self):
self.ser.close()
logging.info(f"Closed serial port {self.port}")


class MockSerialPort:
def __init__(self):
self.buffer = queue.Queue()
self.accumulated_data = b""
self.open = False

def open(self):
self.open = True

def write(self, data):
assert self.open, "Serial port is not open!"

logging.debug(f"Writing data: {data}")
self.buffer.put(data)

def expect(self, pattern, timeout=10, timeout_error=True):
assert self.open, "Serial port is not open!"

end_time = time.time() + timeout
compiled_pattern = re.compile(pattern.encode())
while time.time() < end_time:
Expand All @@ -81,12 +119,15 @@ def expect(self, pattern, timeout=10, timeout_error=True):
return None

def flush_buffer(self):
assert self.open, "Serial port is not open!"

self.accumulated_data = b""
while not self.buffer.empty():
self.buffer.get()

def close(self):
pass
assert self.open, "Serial port is not open!"
self.open = False

def reset_input_buffer(self):
self.flush_buffer()
Expand Down
Loading

0 comments on commit 5aa8158

Please sign in to comment.