From 9f4c1862588c047dee6b69716ef173ebd60c09b3 Mon Sep 17 00:00:00 2001 From: Josh Wu Date: Fri, 17 Nov 2023 00:29:26 +0800 Subject: [PATCH] Add support for Extended Advertising --- bumble/device.py | 128 ++++++++++++++++++++++++++++ bumble/hci.py | 98 ++++++--------------- examples/run_extended_advertiser.py | 69 +++++++++++++++ 3 files changed, 223 insertions(+), 72 deletions(-) create mode 100644 examples/run_extended_advertiser.py diff --git a/bumble/device.py b/bumble/device.py index 45e919deb..6b0c2b84e 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -32,6 +32,7 @@ Optional, Tuple, Type, + Set, Union, cast, overload, @@ -103,9 +104,14 @@ HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Enable_Command, HCI_LE_Set_Advertising_Parameters_Command, + HCI_LE_Set_Advertising_Set_Random_Address_Command, HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Extended_Scan_Enable_Command, HCI_LE_Set_Extended_Scan_Parameters_Command, + HCI_LE_Set_Extended_Scan_Response_Data_Command, + HCI_LE_Set_Extended_Advertising_Data_Command, + HCI_LE_Set_Extended_Advertising_Enable_Command, + HCI_LE_Set_Extended_Advertising_Parameters_Command, HCI_LE_Set_PHY_Command, HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Scan_Enable_Command, @@ -956,6 +962,7 @@ class Device(CompositeEventEmitter): ] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] config: DeviceConfiguration + extended_advertising_handles: Set[int] @composite_listener class Listener: @@ -1054,6 +1061,7 @@ def __init__( self.classic_pending_accepts = { Address.ANY: [] } # Futures, by BD address OR [Futures] for Address.ANY + self.extended_advertising_handles = set() # Own address type cache self.advertising_own_address_type = None @@ -1532,6 +1540,126 @@ async def stop_advertising(self) -> None: self.advertising = False self.auto_restart_advertising = False + async def start_extended_advertising( + self, + advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING, + target: Address = Address.ANY, + own_address_type: int = OwnAddressType.RANDOM, + scan_response: Optional[bytes] = None, + advertising_data: Optional[bytes] = None, + ) -> int: + """Starts an extended advertising set. + + Args: + advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command + target: Directed advertising target. Directed property should be set in advertising_properties arg. + own_address_type: own address type to use in the advertising. + scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent. + advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent. + + Returns: + Handle of the new advertising set. + """ + + adv_handle = ( + 0 + if not self.extended_advertising_handles + else max(self.extended_advertising_handles) + 1 + ) + self.extended_advertising_handles.add(adv_handle) + + # Set the advertising parameters + await self.send_command( + HCI_LE_Set_Extended_Advertising_Parameters_Command( + advertising_handle=adv_handle, + advertising_event_properties=advertising_properties, + primary_advertising_interval_min=self.advertising_interval_min, + primary_advertising_interval_max=self.advertising_interval_max, + primary_advertising_channel_map=( + HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37 + | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38 + | HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39 + ), + own_address_type=own_address_type, + peer_address_type=target.address_type, + peer_address=target, + advertising_tx_power=7, + advertising_filter_policy=0, + primary_advertising_phy=1, # LE 1M + secondary_advertising_max_skip=0, + secondary_advertising_phy=1, # LE 1M + advertising_sid=0, + scan_request_notification_enable=0, + ), # type: ignore[call-arg] + check_result=True, + ) + + # Set the advertising data if present + if advertising_data is not None: + await self.send_command( + HCI_LE_Set_Extended_Advertising_Data_Command( + advertising_handle=adv_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=0x01, # Should not fragment + advertising_data=advertising_data, + ), # type: ignore[call-arg] + check_result=True, + ) + + # Set the scan response if present + if scan_response is not None: + await self.send_command( + HCI_LE_Set_Extended_Scan_Response_Data_Command( + advertising_handle=adv_handle, + operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, + fragment_preference=0x01, # Should not fragment + scan_response_data=scan_response, + ), # type: ignore[call-arg] + check_result=True, + ) + + if own_address_type in ( + OwnAddressType.RANDOM, + OwnAddressType.RESOLVABLE_OR_RANDOM, + ): + await self.send_command( + HCI_LE_Set_Advertising_Set_Random_Address_Command( + advertising_handle=adv_handle, random_address=self.random_address + ), # type: ignore[call-arg] + check_result=True, + ) + + # Enable advertising + await self.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=1, + advertising_handles=[adv_handle], + durations=[0], # Forever + max_extended_advertising_events=[0], # Infinite + ), # type: ignore[call-arg] + check_result=True, + ) + + return adv_handle + + async def stop_extended_advertising(self, adv_handle: int) -> None: + """Stops an extended advertising set. + + Args: + adv_handle: Handle of the advertising set to stop. + """ + self.extended_advertising_handles.remove(adv_handle) + # Disable advertising + await self.send_command( + HCI_LE_Set_Extended_Advertising_Enable_Command( + enable=0, + advertising_handles=[adv_handle], + durations=[0], + max_extended_advertising_events=[0], + ), # type: ignore[call-arg] + check_result=True, + ) + @property def is_advertising(self): return self.advertising diff --git a/bumble/hci.py b/bumble/hci.py index 45cd7ebcb..7ca3a3244 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -3828,9 +3828,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): 'advertising_event_properties', { 'size': 2, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( x - ), + ).name, }, ), ('primary_advertising_interval_min', 3), @@ -3839,9 +3839,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command): 'primary_advertising_channel_map', { 'size': 1, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap( x - ), + ).name, }, ), ('own_address_type', OwnAddressType.TYPE_SPEC), @@ -3862,38 +3862,19 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command): See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command ''' - CONNECTABLE_ADVERTISING = 0 - SCANNABLE_ADVERTISING = 1 - DIRECTED_ADVERTISING = 2 - HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3 - USE_LEGACY_ADVERTISING_PDUS = 4 - ANONYMOUS_ADVERTISING = 5 - INCLUDE_TX_POWER = 6 - - ADVERTISING_PROPERTIES_NAMES = ( - 'CONNECTABLE_ADVERTISING', - 'SCANNABLE_ADVERTISING', - 'DIRECTED_ADVERTISING', - 'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING', - 'USE_LEGACY_ADVERTISING_PDUS', - 'ANONYMOUS_ADVERTISING', - 'INCLUDE_TX_POWER', - ) - - CHANNEL_37 = 0 - CHANNEL_38 = 1 - CHANNEL_39 = 2 - - CHANNEL_NAMES = ('37', '38', '39') - - @classmethod - def advertising_properties_string(cls, properties): - # pylint: disable=line-too-long - return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]' + class AdvertisingProperties(enum.IntFlag): + CONNECTABLE_ADVERTISING = 1 << 0 + SCANNABLE_ADVERTISING = 1 << 1 + DIRECTED_ADVERTISING = 1 << 2 + HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3 + USE_LEGACY_ADVERTISING_PDUS = 1 << 4 + ANONYMOUS_ADVERTISING = 1 << 5 + INCLUDE_TX_POWER = 1 << 6 - @classmethod - def channel_map_string(cls, channel_map): - return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]' + class ChannelMap(enum.IntFlag): + CHANNEL_37 = 1 << 0 + CHANNEL_38 = 1 << 1 + CHANNEL_39 = 1 << 2 # ----------------------------------------------------------------------------- @@ -3905,9 +3886,9 @@ def channel_map_string(cls, channel_map): 'operation', { 'size': 1, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation( x - ), + ).name, }, ), ('fragment_preference', 1), @@ -3925,23 +3906,12 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command): See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command ''' - INTERMEDIATE_FRAGMENT = 0x00 - FIRST_FRAGMENT = 0x01 - LAST_FRAGMENT = 0x02 - COMPLETE_DATA = 0x03 - UNCHANGED_DATA = 0x04 - - OPERATION_NAMES = { - INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT', - FIRST_FRAGMENT: 'FIRST_FRAGMENT', - LAST_FRAGMENT: 'LAST_FRAGMENT', - COMPLETE_DATA: 'COMPLETE_DATA', - UNCHANGED_DATA: 'UNCHANGED_DATA', - } - - @classmethod - def operation_name(cls, operation): - return name_or_number(cls.OPERATION_NAMES, operation) + class Operation(enum.IntEnum): + INTERMEDIATE_FRAGMENT = 0x00 + FIRST_FRAGMENT = 0x01 + LAST_FRAGMENT = 0x02 + COMPLETE_DATA = 0x03 + UNCHANGED_DATA = 0x04 # ----------------------------------------------------------------------------- @@ -3953,9 +3923,9 @@ def operation_name(cls, operation): 'operation', { 'size': 1, - 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name( + 'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation( x - ), + ).name, }, ), ('fragment_preference', 1), @@ -3973,22 +3943,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command): See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command ''' - INTERMEDIATE_FRAGMENT = 0x00 - FIRST_FRAGMENT = 0x01 - LAST_FRAGMENT = 0x02 - COMPLETE_DATA = 0x03 - - OPERATION_NAMES = { - INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT', - FIRST_FRAGMENT: 'FIRST_FRAGMENT', - LAST_FRAGMENT: 'LAST_FRAGMENT', - COMPLETE_DATA: 'COMPLETE_DATA', - } - - @classmethod - def operation_name(cls, operation): - return name_or_number(cls.OPERATION_NAMES, operation) - # ----------------------------------------------------------------------------- @HCI_Command.command( diff --git a/examples/run_extended_advertiser.py b/examples/run_extended_advertiser.py new file mode 100644 index 000000000..20b0b341e --- /dev/null +++ b/examples/run_extended_advertiser.py @@ -0,0 +1,69 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import sys +import os +from bumble.device import AdvertisingType, Device +from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command + +from bumble.transport import open_transport_or_link + + +# ----------------------------------------------------------------------------- +async def main() -> None: + if len(sys.argv) < 3: + print( + 'Usage: run_extended_advertiser.py [type] [address]' + ) + print('example: run_extended_advertiser.py device1.json usb:0') + return + + if len(sys.argv) >= 4: + advertising_properties = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties( + int(sys.argv[3]) + ) + ) + else: + advertising_properties = ( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING + ) + + if len(sys.argv) >= 5: + target = Address(sys.argv[4]) + else: + target = Address.ANY + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + print('<<< connected') + + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) + await device.power_on() + await device.start_extended_advertising( + advertising_properties=advertising_properties, target=target + ) + await hci_transport.source.terminated + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main())