From bbcd14dbf065ecfb045ed65ad771f3dc4670c30d Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Sat, 16 Nov 2024 03:38:44 +0800 Subject: [PATCH] Support Periodic Advertising --- bumble/controller.py | 56 ++++++++++++++++++++++++++++ bumble/device.py | 89 +++++++++++++++++++++++++++++++++----------- bumble/hci.py | 55 +++++++++++++++++++++++++++ tests/device_test.py | 65 +++++++++++++++++++++++++++----- 4 files changed, 233 insertions(+), 32 deletions(-) diff --git a/bumble/controller.py b/bumble/controller.py index f4cbe953..267f3e55 100644 --- a/bumble/controller.py +++ b/bumble/controller.py @@ -1543,6 +1543,41 @@ def on_hci_le_set_default_phy_command(self, command): } return bytes([HCI_SUCCESS]) + def on_hci_le_set_advertising_set_random_address_command(self, _command): + ''' + See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address + Command + ''' + return bytes([HCI_SUCCESS]) + + def on_hci_le_set_extended_advertising_parameters_command(self, _command): + ''' + See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters + Command + ''' + return bytes([HCI_SUCCESS, 0]) + + def on_hci_le_set_extended_advertising_data_command(self, _command): + ''' + See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data + Command + ''' + return bytes([HCI_SUCCESS]) + + def on_hci_le_set_extended_scan_response_data_command(self, _command): + ''' + See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data + Command + ''' + return bytes([HCI_SUCCESS]) + + def on_hci_le_set_extended_advertising_enable_command(self, _command): + ''' + See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable + Command + ''' + return bytes([HCI_SUCCESS]) + def on_hci_le_read_maximum_advertising_data_length_command(self, _command): ''' See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data @@ -1557,6 +1592,27 @@ def on_hci_le_read_number_of_supported_advertising_sets_command(self, _command): ''' return struct.pack(' None: super().__init__() @@ -603,7 +611,7 @@ async def set_advertising_parameters( int(advertising_parameters.primary_advertising_interval_min / 0.625) ), primary_advertising_interval_max=( - int(advertising_parameters.primary_advertising_interval_min / 0.625) + int(advertising_parameters.primary_advertising_interval_max / 0.625) ), primary_advertising_channel_map=int( advertising_parameters.primary_advertising_channel_map @@ -671,10 +679,26 @@ async def set_scan_response_data(self, scan_response_data: bytes) -> None: async def set_periodic_advertising_parameters( self, advertising_parameters: PeriodicAdvertisingParameters ) -> None: + await self.device.send_command( + hci.HCI_LE_Set_Periodic_Advertising_Parameters_Command( + advertising_handle=self.advertising_handle, + periodic_advertising_interval_min=advertising_parameters.periodic_advertising_interval_min, + periodic_advertising_interval_max=advertising_parameters.periodic_advertising_interval_max, + periodic_advertising_properties=advertising_parameters.periodic_advertising_properties, + ), + check_result=True, + ) self.periodic_advertising_parameters = advertising_parameters async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: - # TODO: send command + await self.device.send_command( + hci.HCI_LE_Set_Periodic_Advertising_Data_Command( + advertising_handle=self.advertising_handle, + operation=hci.HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + advertising_data=advertising_data, + ), + check_result=True, + ) self.periodic_advertising_data = advertising_data async def set_random_address(self, random_address: hci.Address) -> None: @@ -712,17 +736,6 @@ async def start( self.emit('start') - async def start_periodic(self, include_adi: bool = False) -> None: - await self.device.send_command( - hci.HCI_LE_Set_Periodic_Advertising_Enable_Command( - enable=1 | (2 if include_adi else 0), - advertising_handles=self.advertising_handle, - ), - check_result=True, - ) - - self.emit('start_periodic') - async def stop(self) -> None: await self.device.send_command( hci.HCI_LE_Set_Extended_Advertising_Enable_Command( @@ -737,14 +750,31 @@ async def stop(self) -> None: self.emit('stop') + async def start_periodic(self, include_adi: bool = False) -> None: + if self.periodic_enabled: + return + await self.device.send_command( + hci.HCI_LE_Set_Periodic_Advertising_Enable_Command( + enable=1 | (2 if include_adi else 0), + advertising_handle=self.advertising_handle, + ), + check_result=True, + ) + self.periodic_enabled = True + + self.emit('start_periodic') + async def stop_periodic(self) -> None: + if not self.periodic_enabled: + return await self.device.send_command( hci.HCI_LE_Set_Periodic_Advertising_Enable_Command( enable=0, - advertising_handles=self.advertising_handle, + advertising_handle=self.advertising_handle, ), check_result=True, ) + self.periodic_enabled = False self.emit('stop_periodic') @@ -2460,14 +2490,27 @@ async def create_advertising_set( if advertising_parameters is None: advertising_parameters = AdvertisingParameters() + if periodic_advertising_data and periodic_advertising_parameters is None: + periodic_advertising_parameters = PeriodicAdvertisingParameters() + if ( not advertising_parameters.advertising_event_properties.is_legacy and advertising_data and scan_response_data ): raise InvalidArgumentError( - "Extended advertisements can't have both data and scan \ - response data" + "Extended advertisements can't have both data and scan response data" + ) + + if periodic_advertising_parameters and ( + advertising_parameters.advertising_event_properties.is_connectable + or advertising_parameters.advertising_event_properties.is_scannable + or advertising_parameters.advertising_event_properties.is_anonymous + or advertising_parameters.advertising_event_properties.is_legacy + ): + raise InvalidArgumentError( + "Periodic advertising set cannot be connectable, scannable, anonymous," + "or legacy" ) # Allocate a new handle @@ -2522,12 +2565,14 @@ async def create_advertising_set( await advertising_set.set_scan_response_data(scan_response_data) if periodic_advertising_parameters: - # TODO: call LE Set Periodic Advertising Parameters command - raise NotImplementedError('periodic advertising not yet supported') + await advertising_set.set_periodic_advertising_parameters( + periodic_advertising_parameters + ) if periodic_advertising_data: - # TODO: call LE Set Periodic Advertising Data command - raise NotImplementedError('periodic advertising not yet supported') + await advertising_set.set_periodic_advertising_data( + periodic_advertising_data + ) except hci.HCI_Error as error: # Remove the advertising set so that it doesn't stay dangling in the diff --git a/bumble/hci.py b/bumble/hci.py index da804ba1..ce26596e 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -4331,6 +4331,61 @@ class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command): ''' +# ----------------------------------------------------------------------------- +@HCI_Command.command( + [ + ('advertising_handle', 1), + ('periodic_advertising_interval_min', 2), + ('periodic_advertising_interval_max', 2), + ('periodic_advertising_properties', 2), + ] +) +class HCI_LE_Set_Periodic_Advertising_Parameters_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.61 LE Set Periodic Advertising Parameters command + ''' + + class Properties(enum.IntFlag): + INCLUDE_TX_POWER = 1 << 6 + + advertising_handle: int + periodic_advertising_interval_min: int + periodic_advertising_interval_max: int + periodic_advertising_properties: int + + +# ----------------------------------------------------------------------------- +@HCI_Command.command( + [ + ('advertising_handle', 1), + ( + 'operation', + { + 'size': 1, + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation( + x + ).name, + }, + ), + ( + 'advertising_data', + { + 'parser': HCI_Object.parse_length_prefixed_bytes, + 'serializer': HCI_Object.serialize_length_prefixed_bytes, + }, + ), + ] +) +class HCI_LE_Set_Periodic_Advertising_Data_Command(HCI_Command): + ''' + See Bluetooth spec @ 7.8.62 LE Set Periodic Advertising Data command + ''' + + advertising_handle: int + operation: int + advertising_data: bytes + + # ----------------------------------------------------------------------------- @HCI_Command.command([('enable', 1), ('advertising_handle', 1)]) class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command): diff --git a/tests/device_test.py b/tests/device_test.py index 45b84ce1..1f6175ab 100644 --- a/tests/device_test.py +++ b/tests/device_test.py @@ -19,9 +19,7 @@ import functools import logging import os -from types import LambdaType import pytest -from unittest import mock from bumble.core import ( BT_BR_EDR_TRANSPORT, @@ -29,7 +27,13 @@ BT_PERIPHERAL_ROLE, ConnectionParameters, ) -from bumble.device import AdvertisingParameters, Connection, Device +from bumble.device import ( + AdvertisingEventProperties, + AdvertisingParameters, + Connection, + Device, + PeriodicAdvertisingParameters, +) from bumble.host import AclPacketQueue, Host from bumble.hci import ( HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, @@ -265,7 +269,8 @@ async def test_flush(): # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_legacy_advertising(): - device = Device(host=mock.AsyncMock(Host)) + device = TwoDevices()[0] + await device.power_on() # Start advertising await device.start_advertising() @@ -283,7 +288,10 @@ async def test_legacy_advertising(): ) @pytest.mark.asyncio async def test_legacy_advertising_disconnection(auto_restart): - device = Device(host=mock.AsyncMock(spec=Host)) + devices = TwoDevices() + device = devices[0] + devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff') + await device.power_on() peer_address = Address('F0:F1:F2:F3:F4:F5') await device.start_advertising(auto_restart=auto_restart) device.on_connection( @@ -305,6 +313,11 @@ async def test_legacy_advertising_disconnection(auto_restart): await async_barrier() if auto_restart: + assert device.legacy_advertising_set + started = asyncio.Event() + if not device.is_advertising: + device.legacy_advertising_set.once('start', started.set) + await asyncio.wait_for(started.wait(), _TIMEOUT) assert device.is_advertising else: assert not device.is_advertising @@ -313,7 +326,8 @@ async def test_legacy_advertising_disconnection(auto_restart): # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_extended_advertising(): - device = Device(host=mock.AsyncMock(Host)) + device = TwoDevices()[0] + await device.power_on() # Start advertising advertising_set = await device.create_advertising_set() @@ -332,7 +346,8 @@ async def test_extended_advertising(): ) @pytest.mark.asyncio async def test_extended_advertising_connection(own_address_type): - device = Device(host=mock.AsyncMock(spec=Host)) + device = TwoDevices()[0] + await device.power_on() peer_address = Address('F0:F1:F2:F3:F4:F5') advertising_set = await device.create_advertising_set( advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) @@ -368,8 +383,10 @@ async def test_extended_advertising_connection(own_address_type): ) @pytest.mark.asyncio async def test_extended_advertising_connection_out_of_order(own_address_type): - device = Device(host=mock.AsyncMock(spec=Host)) - peer_address = Address('F0:F1:F2:F3:F4:F5') + devices = TwoDevices() + device = devices[0] + devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff') + await device.power_on() advertising_set = await device.create_advertising_set( advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) ) @@ -382,7 +399,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type): device.on_connection( 0x0001, BT_LE_TRANSPORT, - peer_address, + Address('F0:F1:F2:F3:F4:F5'), None, None, BT_PERIPHERAL_ROLE, @@ -397,6 +414,34 @@ async def test_extended_advertising_connection_out_of_order(own_address_type): await async_barrier() +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_periodic_advertising(): + device = TwoDevices()[0] + await device.power_on() + + # Start advertising + advertising_set = await device.create_advertising_set( + advertising_parameters=AdvertisingParameters( + advertising_event_properties=AdvertisingEventProperties( + is_connectable=False + ) + ), + advertising_data=b'123', + periodic_advertising_parameters=PeriodicAdvertisingParameters(), + periodic_advertising_data=b'abc', + ) + assert device.extended_advertising_sets + assert advertising_set.enabled + assert not advertising_set.periodic_enabled + + await advertising_set.start_periodic() + assert advertising_set.periodic_enabled + + await advertising_set.stop_periodic() + assert not advertising_set.periodic_enabled + + # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_get_remote_le_features():