From 464a476f9fc1550e7f30c6f41f2d0f26b195edda Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Mon, 27 Nov 2023 23:10:19 +0800 Subject: [PATCH] Add CSIP --- .vscode/settings.json | 3 + bumble/profiles/csip.py | 147 ++++++++++++++++++++++++++++++++++++++++ tests/csip_test.py | 74 ++++++++++++++++++++ tests/test_utils.py | 3 + 4 files changed, 227 insertions(+) create mode 100644 bumble/profiles/csip.py create mode 100644 tests/csip_test.py diff --git a/.vscode/settings.json b/.vscode/settings.json index 466158f5..4011e643 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -21,6 +21,7 @@ "cccds", "cmac", "CONNECTIONLESS", + "csip", "csrcs", "datagram", "DATALINK", @@ -45,6 +46,7 @@ "NONCONN", "OXIMETER", "popleft", + "PRAND", "protobuf", "psms", "pyee", @@ -56,6 +58,7 @@ "SEID", "seids", "SERV", + "SIRK", "ssrc", "strerror", "subband", diff --git a/bumble/profiles/csip.py b/bumble/profiles/csip.py new file mode 100644 index 00000000..9657246b --- /dev/null +++ b/bumble/profiles/csip.py @@ -0,0 +1,147 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import enum +import struct +from typing import Optional + +from bumble import gatt +from bumble import gatt_client + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +class SirkType(enum.IntEnum): + '''Coordinated Set Identification Service - 5.1 Set Identity Resolving Key.''' + + ENCRYPTED = 0x00 + PLAINTEXT = 0x01 + + +class MemberLock(enum.IntEnum): + '''Coordinated Set Identification Service - 5.3 Set Member Lock.''' + + UNLOCKED = 0x01 + LOCKED = 0x02 + + +# ----------------------------------------------------------------------------- +# Utils +# ----------------------------------------------------------------------------- +# TODO: Implement RSI Generator + + +# ----------------------------------------------------------------------------- +# Server +# ----------------------------------------------------------------------------- +class CoordinatedSetIdentificationService(gatt.TemplateService): + UUID = gatt.GATT_COORDINATED_SET_IDENTIFICATION_SERVICE + + set_identity_resolving_key_characteristic: gatt.Characteristic + coordinated_set_size_characteristic: Optional[gatt.Characteristic] = None + set_member_lock_characteristic: Optional[gatt.Characteristic] = None + set_member_rank_characteristic: Optional[gatt.Characteristic] = None + + def __init__( + self, + set_identity_resolving_key: bytes, + coordinated_set_size: Optional[int] = None, + set_member_lock: Optional[MemberLock] = None, + set_member_rank: Optional[int] = None, + ) -> None: + characteristics = [] + + self.set_identity_resolving_key_characteristic = gatt.Characteristic( + uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY, + permissions=gatt.Characteristic.Permissions.READABLE, + # TODO: Implement encrypted SIRK reader. + value=struct.pack('B', SirkType.PLAINTEXT) + set_identity_resolving_key, + ) + characteristics.append(self.set_identity_resolving_key_characteristic) + + if coordinated_set_size is not None: + self.coordinated_set_size_characteristic = gatt.Characteristic( + uuid=gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY, + permissions=gatt.Characteristic.Permissions.READABLE, + value=struct.pack('B', coordinated_set_size), + ) + characteristics.append(self.coordinated_set_size_characteristic) + + if set_member_lock is not None: + self.set_member_lock_characteristic = gatt.Characteristic( + uuid=gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY + | gatt.Characteristic.Properties.WRITE, + permissions=gatt.Characteristic.Permissions.READABLE + | gatt.Characteristic.Permissions.WRITEABLE, + value=struct.pack('B', set_member_lock), + ) + characteristics.append(self.set_member_lock_characteristic) + + if set_member_rank is not None: + self.set_member_rank_characteristic = gatt.Characteristic( + uuid=gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.READ + | gatt.Characteristic.Properties.NOTIFY, + permissions=gatt.Characteristic.Permissions.READABLE, + value=struct.pack('B', set_member_rank), + ) + characteristics.append(self.set_member_rank_characteristic) + + super().__init__(characteristics) + + +# ----------------------------------------------------------------------------- +# Client +# ----------------------------------------------------------------------------- +class CoordinatedSetIdentificationProxy(gatt_client.ProfileServiceProxy): + SERVICE_CLASS = CoordinatedSetIdentificationService + + set_identity_resolving_key: gatt_client.CharacteristicProxy + coordinated_set_size: Optional[gatt_client.CharacteristicProxy] = None + set_member_lock: Optional[gatt_client.CharacteristicProxy] = None + set_member_rank: Optional[gatt_client.CharacteristicProxy] = None + + def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: + self.service_proxy = service_proxy + + self.set_identity_resolving_key = service_proxy.get_characteristics_by_uuid( + gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC + )[0] + + if characteristics := service_proxy.get_characteristics_by_uuid( + gatt.GATT_COORDINATED_SET_SIZE_CHARACTERISTIC + ): + self.coordinated_set_size = characteristics[0] + + if characteristics := service_proxy.get_characteristics_by_uuid( + gatt.GATT_SET_MEMBER_LOCK_CHARACTERISTIC + ): + self.set_member_lock = characteristics[0] + + if characteristics := service_proxy.get_characteristics_by_uuid( + gatt.GATT_SET_MEMBER_RANK_CHARACTERISTIC + ): + self.set_member_rank = characteristics[0] diff --git a/tests/csip_test.py b/tests/csip_test.py new file mode 100644 index 00000000..6f2c7fda --- /dev/null +++ b/tests/csip_test.py @@ -0,0 +1,74 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import os +import pytest +import struct +import logging + +from bumble import device +from bumble.profiles import csip +from .test_utils import TwoDevices + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_csis(): + SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa') + + devices = TwoDevices() + devices[0].add_service( + csip.CoordinatedSetIdentificationService( + set_identity_resolving_key=SIRK, + coordinated_set_size=2, + set_member_lock=csip.MemberLock.UNLOCKED, + set_member_rank=0, + ) + ) + + await devices.setup_connection() + peer = device.Peer(devices.connections[1]) + csis_client = await peer.discover_service_and_create_proxy( + csip.CoordinatedSetIdentificationProxy + ) + + assert ( + await csis_client.set_identity_resolving_key.read_value() + == bytes([csip.SirkType.PLAINTEXT]) + SIRK + ) + assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2) + assert await csis_client.set_member_lock.read_value() == struct.pack( + 'B', csip.MemberLock.UNLOCKED + ) + assert await csis_client.set_member_rank.read_value() == struct.pack('B', 0) + + +# ----------------------------------------------------------------------------- +async def run(): + await test_csis() + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + asyncio.run(run()) diff --git a/tests/test_utils.py b/tests/test_utils.py index f19f18c8..bf36e2d0 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -71,3 +71,6 @@ async def setup_connection(self) -> None: # Check the post conditions assert self.connections[0] is not None assert self.connections[1] is not None + + def __getitem__(self, index: int) -> Device: + return self.devices[index]