Skip to content

Commit

Permalink
Add support for Extended Advertising
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Nov 18, 2023
1 parent 0667e83 commit 9f4c186
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 72 deletions.
128 changes: 128 additions & 0 deletions bumble/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
Optional,
Tuple,
Type,
Set,
Union,
cast,
overload,
Expand Down Expand Up @@ -103,9 +104,14 @@
HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command,
HCI_LE_Set_Advertising_Set_Random_Address_Command,
HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Extended_Scan_Enable_Command,
HCI_LE_Set_Extended_Scan_Parameters_Command,
HCI_LE_Set_Extended_Scan_Response_Data_Command,
HCI_LE_Set_Extended_Advertising_Data_Command,
HCI_LE_Set_Extended_Advertising_Enable_Command,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
HCI_LE_Set_PHY_Command,
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
Expand Down Expand Up @@ -956,6 +962,7 @@ class Device(CompositeEventEmitter):
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
extended_advertising_handles: Set[int]

@composite_listener
class Listener:
Expand Down Expand Up @@ -1054,6 +1061,7 @@ def __init__(
self.classic_pending_accepts = {
Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY
self.extended_advertising_handles = set()

# Own address type cache
self.advertising_own_address_type = None
Expand Down Expand Up @@ -1532,6 +1540,126 @@ async def stop_advertising(self) -> None:
self.advertising = False
self.auto_restart_advertising = False

async def start_extended_advertising(
self,
advertising_properties: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties = HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING,
target: Address = Address.ANY,
own_address_type: int = OwnAddressType.RANDOM,
scan_response: Optional[bytes] = None,
advertising_data: Optional[bytes] = None,
) -> int:
"""Starts an extended advertising set.
Args:
advertising_properties: Properties to pass in HCI_LE_Set_Extended_Advertising_Parameters_Command
target: Directed advertising target. Directed property should be set in advertising_properties arg.
own_address_type: own address type to use in the advertising.
scan_response: raw scan response. When a non-none value is set, HCI_LE_Set_Extended_Scan_Response_Data_Command will be sent.
advertising_data: raw advertising data. When a non-none value is set, HCI_LE_Set_Advertising_Set_Random_Address_Command will be sent.
Returns:
Handle of the new advertising set.
"""

adv_handle = (
0
if not self.extended_advertising_handles
else max(self.extended_advertising_handles) + 1
)
self.extended_advertising_handles.add(adv_handle)

# Set the advertising parameters
await self.send_command(
HCI_LE_Set_Extended_Advertising_Parameters_Command(
advertising_handle=adv_handle,
advertising_event_properties=advertising_properties,
primary_advertising_interval_min=self.advertising_interval_min,
primary_advertising_interval_max=self.advertising_interval_max,
primary_advertising_channel_map=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_37
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_38
| HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap.CHANNEL_39
),
own_address_type=own_address_type,
peer_address_type=target.address_type,
peer_address=target,
advertising_tx_power=7,
advertising_filter_policy=0,
primary_advertising_phy=1, # LE 1M
secondary_advertising_max_skip=0,
secondary_advertising_phy=1, # LE 1M
advertising_sid=0,
scan_request_notification_enable=0,
), # type: ignore[call-arg]
check_result=True,
)

# Set the advertising data if present
if advertising_data is not None:
await self.send_command(
HCI_LE_Set_Extended_Advertising_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
advertising_data=advertising_data,
), # type: ignore[call-arg]
check_result=True,
)

# Set the scan response if present
if scan_response is not None:
await self.send_command(
HCI_LE_Set_Extended_Scan_Response_Data_Command(
advertising_handle=adv_handle,
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
), # type: ignore[call-arg]
check_result=True,
)

if own_address_type in (
OwnAddressType.RANDOM,
OwnAddressType.RESOLVABLE_OR_RANDOM,
):
await self.send_command(
HCI_LE_Set_Advertising_Set_Random_Address_Command(
advertising_handle=adv_handle, random_address=self.random_address
), # type: ignore[call-arg]
check_result=True,
)

# Enable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=1,
advertising_handles=[adv_handle],
durations=[0], # Forever
max_extended_advertising_events=[0], # Infinite
), # type: ignore[call-arg]
check_result=True,
)

return adv_handle

async def stop_extended_advertising(self, adv_handle: int) -> None:
"""Stops an extended advertising set.
Args:
adv_handle: Handle of the advertising set to stop.
"""
self.extended_advertising_handles.remove(adv_handle)
# Disable advertising
await self.send_command(
HCI_LE_Set_Extended_Advertising_Enable_Command(
enable=0,
advertising_handles=[adv_handle],
durations=[0],
max_extended_advertising_events=[0],
), # type: ignore[call-arg]
check_result=True,
)

@property
def is_advertising(self):
return self.advertising
Expand Down
98 changes: 26 additions & 72 deletions bumble/hci.py
Original file line number Diff line number Diff line change
Expand Up @@ -3828,9 +3828,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'advertising_event_properties',
{
'size': 2,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.advertising_properties_string(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
x
),
).name,
},
),
('primary_advertising_interval_min', 3),
Expand All @@ -3839,9 +3839,9 @@ class HCI_LE_Set_Advertising_Set_Random_Address_Command(HCI_Command):
'primary_advertising_channel_map',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.channel_map_string(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap(
x
),
).name,
},
),
('own_address_type', OwnAddressType.TYPE_SPEC),
Expand All @@ -3862,38 +3862,19 @@ class HCI_LE_Set_Extended_Advertising_Parameters_Command(HCI_Command):
See Bluetooth spec @ 7.8.53 LE Set Extended Advertising Parameters Command
'''

CONNECTABLE_ADVERTISING = 0
SCANNABLE_ADVERTISING = 1
DIRECTED_ADVERTISING = 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 3
USE_LEGACY_ADVERTISING_PDUS = 4
ANONYMOUS_ADVERTISING = 5
INCLUDE_TX_POWER = 6

ADVERTISING_PROPERTIES_NAMES = (
'CONNECTABLE_ADVERTISING',
'SCANNABLE_ADVERTISING',
'DIRECTED_ADVERTISING',
'HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING',
'USE_LEGACY_ADVERTISING_PDUS',
'ANONYMOUS_ADVERTISING',
'INCLUDE_TX_POWER',
)

CHANNEL_37 = 0
CHANNEL_38 = 1
CHANNEL_39 = 2

CHANNEL_NAMES = ('37', '38', '39')

@classmethod
def advertising_properties_string(cls, properties):
# pylint: disable=line-too-long
return f'[{",".join(bit_flags_to_strings(properties, cls.ADVERTISING_PROPERTIES_NAMES))}]'
class AdvertisingProperties(enum.IntFlag):
CONNECTABLE_ADVERTISING = 1 << 0
SCANNABLE_ADVERTISING = 1 << 1
DIRECTED_ADVERTISING = 1 << 2
HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING = 1 << 3
USE_LEGACY_ADVERTISING_PDUS = 1 << 4
ANONYMOUS_ADVERTISING = 1 << 5
INCLUDE_TX_POWER = 1 << 6

@classmethod
def channel_map_string(cls, channel_map):
return f'[{",".join(bit_flags_to_strings(channel_map, cls.CHANNEL_NAMES))}]'
class ChannelMap(enum.IntFlag):
CHANNEL_37 = 1 << 0
CHANNEL_38 = 1 << 1
CHANNEL_39 = 1 << 2


# -----------------------------------------------------------------------------
Expand All @@ -3905,9 +3886,9 @@ def channel_map_string(cls, channel_map):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
),
).name,
},
),
('fragment_preference', 1),
Expand All @@ -3925,23 +3906,12 @@ class HCI_LE_Set_Extended_Advertising_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.54 LE Set Extended Advertising Data Command
'''

INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04

OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
UNCHANGED_DATA: 'UNCHANGED_DATA',
}

@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)
class Operation(enum.IntEnum):
INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03
UNCHANGED_DATA = 0x04


# -----------------------------------------------------------------------------
Expand All @@ -3953,9 +3923,9 @@ def operation_name(cls, operation):
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.operation_name(
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
),
).name,
},
),
('fragment_preference', 1),
Expand All @@ -3973,22 +3943,6 @@ class HCI_LE_Set_Extended_Scan_Response_Data_Command(HCI_Command):
See Bluetooth spec @ 7.8.55 LE Set Extended Scan Response Data Command
'''

INTERMEDIATE_FRAGMENT = 0x00
FIRST_FRAGMENT = 0x01
LAST_FRAGMENT = 0x02
COMPLETE_DATA = 0x03

OPERATION_NAMES = {
INTERMEDIATE_FRAGMENT: 'INTERMEDIATE_FRAGMENT',
FIRST_FRAGMENT: 'FIRST_FRAGMENT',
LAST_FRAGMENT: 'LAST_FRAGMENT',
COMPLETE_DATA: 'COMPLETE_DATA',
}

@classmethod
def operation_name(cls, operation):
return name_or_number(cls.OPERATION_NAMES, operation)


# -----------------------------------------------------------------------------
@HCI_Command.command(
Expand Down
69 changes: 69 additions & 0 deletions examples/run_extended_advertiser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command

from bumble.transport import open_transport_or_link


# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_extended_advertiser.py <config-file> <transport-spec> [type] [address]'
)
print('example: run_extended_advertiser.py device1.json usb:0')
return

if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)

if len(sys.argv) >= 5:
target = Address(sys.argv[4])
else:
target = Address.ANY

print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')

device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
)
await hci_transport.source.terminated


# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

0 comments on commit 9f4c186

Please sign in to comment.