Skip to content

Commit

Permalink
Allow to define descriptor for GATT characteristic
Browse files Browse the repository at this point in the history
  • Loading branch information
mklemarczyk committed Jun 23, 2024
1 parent 6da1a59 commit f269f2f
Show file tree
Hide file tree
Showing 16 changed files with 630 additions and 21 deletions.
15 changes: 14 additions & 1 deletion bless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
BlessGATTCharacteristicBlueZDBus as BlessGATTCharacteristic,
)

# Descriptor Classes
from bless.backends.bluezdbus.descriptor import ( # noqa: F401
BlessGATTDescriptorBlueZDBus as BlessGATTDescriptor,
)

elif sys.platform == "win32":

# Server
Expand All @@ -52,11 +57,19 @@
BlessGATTCharacteristicWinRT as BlessGATTCharacteristic,
)

# type: ignore
from bless.backends.attribute import ( # noqa: E402 F401
GATTAttributePermissions,
)

# type: ignore
from bless.backends.characteristic import ( # noqa: E402 F401
GATTCharacteristicProperties,
GATTAttributePermissions,
)

# type: ignore
from bless.backends.descriptor import ( # noqa: E402 F401
GATTDescriptorProperties,
)


Expand Down
7 changes: 7 additions & 0 deletions bless/backends/attribute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Flag

class GATTAttributePermissions(Flag):
readable = 0x1
writeable = 0x2
read_encryption_required = 0x4
write_encryption_required = 0x8
20 changes: 19 additions & 1 deletion bless/backends/bluezdbus/characteristic.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from typing import Union, Optional, List, Dict, cast, TYPE_CHECKING

from bless.backends.bluezdbus.descriptor import BlessGATTDescriptorBlueZDBus
from bleak.backends.bluezdbus.characteristic import ( # type: ignore
_GattCharacteristicsFlagsEnum,
BleakGATTCharacteristicBlueZDBus,
Expand All @@ -19,10 +20,12 @@
from bless.backends.bluezdbus.service import BlessGATTServiceBlueZDBus
from bless.backends.service import BlessGATTService

from bless.backends.attribute import ( # type: ignore
GATTAttributePermissions,
)
from bless.backends.characteristic import ( # noqa: E402
BlessGATTCharacteristic,
GATTCharacteristicProperties,
GATTAttributePermissions,
)

from bless.backends.bluezdbus.dbus.characteristic import ( # noqa: E402
Expand Down Expand Up @@ -63,6 +66,7 @@ def __init__(
"""
value = value if value is not None else bytearray(b"")
super().__init__(uuid, properties, permissions, value)
self.__descriptors: List[BlessGATTDescriptorBlueZDBus] = []
self.value = value

async def init(self, service: "BlessGATTService"):
Expand Down Expand Up @@ -118,6 +122,20 @@ def uuid(self) -> str:
"""The uuid of this characteristic"""
return self.obj.get("UUID").value

@property
def descriptors(self) -> List[BlessGATTDescriptorBlueZDBus]: # type: ignore
"""List of characteristics for this service"""
return self.__descriptors

def add_descriptor( # type: ignore
self,
descriptor: BlessGATTDescriptorBlueZDBus
):
"""
Should not be used by end user, but rather by `bleak` itself.
"""
self.__descriptors.append(descriptor)

def transform_flags_with_permissions(flag: Flags, permissions: GATTAttributePermissions) -> Flags:
"""
Returns the encrypted variant of a flag if the corresponding permission is set
Expand Down
32 changes: 29 additions & 3 deletions bless/backends/bluezdbus/dbus/characteristic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

import bleak.backends.bluezdbus.defs as defs # type: ignore

from typing import List, Dict, TYPE_CHECKING
from typing import List, TYPE_CHECKING, Any, Dict

from dbus_next.service import ServiceInterface, method, dbus_property # type: ignore
from dbus_next.constants import PropertyAccess # type: ignore
from dbus_next.signature import Variant # type: ignore

from .descriptor import BlueZGattDescriptor, DescriptorFlags # type: ignore

if TYPE_CHECKING:
from bless.backends.bluezdbus.dbus.service import ( # type: ignore # noqa: F401
BlueZGattService,
BlueZGattDescriptor
BlueZGattService
)


Expand Down Expand Up @@ -163,6 +164,31 @@ def StopNotify(self): # noqa: N802
f(None)
self._service.app.subscribed_characteristics.remove(self._uuid)

async def add_descriptor(
self, uuid: str, flags: List[DescriptorFlags], value: Any
) -> BlueZGattDescriptor:
"""
Adds a BlueZGattDescriptor to the characteristic.
Parameters
----------
uuid : str
The string representation of the UUID for the descriptor
flags : List[DescriptorFlags],
A list of flags to apply to the descriptor
value : Any
The descriptor's value
"""
index: int = len(self.descriptors) + 1
descriptor: BlueZGattDescriptor = BlueZGattDescriptor(
uuid, flags, index, self
)
descriptor._value = value # type: ignore
self.descriptors.append(descriptor)
await self._service.app._register_object(descriptor)
return descriptor


async def get_obj(self) -> Dict:
"""
Obtain the underlying dictionary within the BlueZ API that describes
Expand Down
134 changes: 134 additions & 0 deletions bless/backends/bluezdbus/dbus/descriptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from enum import Enum

import bleak.backends.bluezdbus.defs as defs # type: ignore

from typing import List, Dict, TYPE_CHECKING

from dbus_next.service import ServiceInterface, method, dbus_property # type: ignore
from dbus_next.constants import PropertyAccess # type: ignore
from dbus_next.signature import Variant # type: ignore

if TYPE_CHECKING:
from bless.backends.bluezdbus.dbus.characteristic import ( # type: ignore # noqa: F401
BlueZGattCharacteristic
)


class DescriptorFlags(Enum):
READ = "read"
WRITE = "write"
ENCRYPT_READ = "encrypt-read"
ENCRYPT_WRITE = "encrypt-write"
ENCRYPT_AUTHENTICATED_READ = "encrypt-authenticated-read"
ENCRYPT_AUTHENTICATED_WRITE = "encrypt-authenticated-write"
AUTHORIZE = "authorize"


class BlueZGattDescriptor(ServiceInterface):
"""
org.bluez.GattDescriptor1 interface implementation
"""

interface_name: str = defs.GATT_DESCRIPTOR_INTERFACE

def __init__(
self,
uuid: str,
flags: List[DescriptorFlags],
index: int,
characteristic: "BlueZGattCharacteristic", # noqa: F821
):
"""
Create a BlueZ Gatt Descriptor
Parameters
----------
uuid : str
The unique identifier for the descriptor
flags : List[DescriptorFlags]
A list of strings that represent the properties of the
descriptor
index : int
The index number for this descriptor in the descriptors
characteristic : BlueZService
The Gatt Characteristic that owns this descriptor
"""
self.path: str = characteristic.path + "/desc" + f"{index:04d}"
self._uuid: str = uuid
self._flags: List[str] = [x.value for x in flags]
self._characteristic_path: str = characteristic.path # noqa: F821
self._characteristic: "BlueZGattCharacteristic" = characteristic # noqa: F821

self._value: bytes = b""

super(BlueZGattDescriptor, self).__init__(self.interface_name)

@dbus_property(access=PropertyAccess.READ)
def UUID(self) -> "s": # type: ignore # noqa: F821 N802
return self._uuid

@dbus_property(access=PropertyAccess.READ)
def Characteristic(self) -> "o": # type: ignore # noqa: F821 N802
return self._characteristic_path

@dbus_property()
def Value(self) -> "ay": # type: ignore # noqa: F821 N802
return self._value

@Value.setter # type: ignore
def Value(self, value: "ay"): # type: ignore # noqa: F821 N802
self._value = value
self.emit_properties_changed(
changed_properties={"Value": self._value}
)

@dbus_property(access=PropertyAccess.READ) # noqa: F722
def Flags(self) -> "as": # type: ignore # noqa: F821 F722 N802
return self._flags

@method() # noqa: F722
def ReadValue(self, options: "a{sv}") -> "ay": # type: ignore # noqa: F722 F821 N802 E501
"""
Read the value of the descriptor.
This is to be fully implemented at the application level
Parameters
----------
options : Dict
A list of options
Returns
-------
bytes
The bytes that is the value of the descriptor
"""
return self._value

@method() # noqa: F722
def WriteValue(self, value: "ay", options: "a{sv}"): # type: ignore # noqa
"""
Write a value to the descriptor
This is to be fully implemented at the application level
Parameters
----------
value : bytes
The value to set
options : Dict
Some options for you to select from
"""
self._value = value

async def get_obj(self) -> Dict:
"""
Obtain the underlying dictionary within the BlueZ API that describes
the descriptor
Returns
-------
Dict
The dictionary that describes the descriptor
"""
return {
"UUID": Variant('s', self._uuid)
}
Loading

0 comments on commit f269f2f

Please sign in to comment.