diff --git a/bumble/device.py b/bumble/device.py index 37f161087..3ba3fae07 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -21,7 +21,7 @@ import json import asyncio import logging -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import asynccontextmanager, AsyncExitStack, closing from dataclasses import dataclass from collections.abc import Iterable from typing import ( @@ -49,6 +49,7 @@ HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_CENTRAL_ROLE, HCI_COMMAND_STATUS_PENDING, + HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_DISPLAY_YES_NO_IO_CAPABILITY, HCI_DISPLAY_ONLY_IO_CAPABILITY, @@ -85,29 +86,35 @@ HCI_Constant, HCI_Create_Connection_Cancel_Command, HCI_Create_Connection_Command, + HCI_Create_Connection_Command, HCI_Disconnect_Command, HCI_Encryption_Change_Event, HCI_Error, HCI_IO_Capability_Request_Reply_Command, HCI_Inquiry_Cancel_Command, HCI_Inquiry_Command, + HCI_IsoDataPacket, + HCI_LE_Accept_CIS_Request_Command, HCI_LE_Add_Device_To_Resolving_List_Command, HCI_LE_Advertising_Report_Event, HCI_LE_Clear_Resolving_List_Command, HCI_LE_Connection_Update_Command, HCI_LE_Create_Connection_Cancel_Command, HCI_LE_Create_Connection_Command, + HCI_LE_Create_CIS_Command, HCI_LE_Enable_Encryption_Command, HCI_LE_Extended_Advertising_Report_Event, HCI_LE_Extended_Create_Connection_Command, HCI_LE_Rand_Command, HCI_LE_Read_PHY_Command, + HCI_LE_Reject_CIS_Request_Command, HCI_LE_Remove_Advertising_Set_Command, HCI_LE_Set_Address_Resolution_Enable_Command, HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Enable_Command, HCI_LE_Set_Advertising_Parameters_Command, HCI_LE_Set_Advertising_Set_Random_Address_Command, + HCI_LE_Set_CIG_Parameters_Command, HCI_LE_Set_Data_Length_Command, HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Extended_Scan_Enable_Command, @@ -116,6 +123,7 @@ HCI_LE_Set_Extended_Advertising_Data_Command, HCI_LE_Set_Extended_Advertising_Enable_Command, HCI_LE_Set_Extended_Advertising_Parameters_Command, + HCI_LE_Set_Host_Feature_Command, HCI_LE_Set_PHY_Command, HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Scan_Enable_Command, @@ -130,6 +138,7 @@ HCI_Switch_Role_Command, HCI_Set_Connection_Encryption_Command, HCI_StatusError, + HCI_SynchronousDataPacket, HCI_User_Confirmation_Request_Negative_Reply_Command, HCI_User_Confirmation_Request_Reply_Command, HCI_User_Passkey_Request_Negative_Reply_Command, @@ -161,6 +170,7 @@ from .utils import ( AsyncRunner, CompositeEventEmitter, + EventWatcher, setup_event_forwarding, composite_listener, deprecated, @@ -592,6 +602,46 @@ class ConnectionParametersPreferences: ConnectionParametersPreferences.default = ConnectionParametersPreferences() +# ----------------------------------------------------------------------------- +@dataclass +class ScoLink(CompositeEventEmitter): + device: Device + acl_connection: Connection + handle: int + link_type: int + + def __post_init__(self): + super().__init__() + + async def disconnect( + self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + ) -> None: + await self.device.disconnect(self, reason) + + +# ----------------------------------------------------------------------------- +@dataclass +class CisLink(CompositeEventEmitter): + class State(IntEnum): + PENDING = 0 + ESTABLISHED = 1 + + device: Device + acl_connection: Connection # Based ACL connection + handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events) + cis_id: int # CIS ID assigned by Central device + cig_id: int # CIG ID assigned by Central device + state: State = State.PENDING + + def __post_init__(self): + super().__init__() + + async def disconnect( + self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR + ) -> None: + await self.device.disconnect(self, reason) + + # ----------------------------------------------------------------------------- class Connection(CompositeEventEmitter): device: Device @@ -870,6 +920,7 @@ def __init__(self) -> None: self.keystore = None self.gatt_services: List[Dict[str, Any]] = [] self.address_resolution_offload = False + self.cis_enabled = False def load_from_dict(self, config: Dict[str, Any]) -> None: # Load simple properties @@ -905,6 +956,7 @@ def load_from_dict(self, config: Dict[str, Any]) -> None: self.address_resolution_offload = config.get( 'address_resolution_offload', self.address_resolution_offload ) + self.cis_enabled = config.get('cis_enabled', self.cis_enabled) # Load or synthesize an IRK irk = config.get('irk') @@ -1012,6 +1064,8 @@ class Device(CompositeEventEmitter): advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] config: DeviceConfiguration extended_advertising_handles: Set[int] + sco_links: Dict[int, ScoLink] + cis_links: Dict[int, CisLink] @composite_listener class Listener: @@ -1104,6 +1158,8 @@ def __init__( self.disconnecting = False self.connections = {} # Connections, by connection handle self.pending_connections = {} # Connections, by BD address (BR/EDR only) + self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only) + self.cis_links = {} # CisLinks, by connection handle (LE only) self.classic_enabled = False self.inquiry_response = None self.address_resolver = None @@ -1133,6 +1189,7 @@ def __init__( self.le_enabled = config.le_enabled self.classic_enabled = config.classic_enabled self.le_simultaneous_enabled = config.le_simultaneous_enabled + self.cis_enabled = config.cis_enabled self.classic_sc_enabled = config.classic_sc_enabled self.classic_ssp_enabled = config.classic_ssp_enabled self.classic_smp_enabled = config.classic_smp_enabled @@ -1443,6 +1500,16 @@ async def power_on(self) -> None: ) # type: ignore[call-arg] ) + if self.cis_enabled: + await self.send_command( + HCI_LE_Set_Host_Feature_Command( # type: ignore[call-arg] + bit_number=( + HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE + ), + bit_value=1, + ) + ) + if self.classic_enabled: await self.send_command( HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg] @@ -2366,7 +2433,9 @@ async def cancel_connection(self, peer_address=None): check_result=True, ) - async def disconnect(self, connection, reason): + async def disconnect( + self, connection: Union[Connection, ScoLink, CisLink], reason: int + ) -> None: # Create a future so that we can wait for the disconnection's result pending_disconnection = asyncio.get_running_loop().create_future() connection.on('disconnection', pending_disconnection.set_result) @@ -2374,7 +2443,7 @@ async def disconnect(self, connection, reason): # Request a disconnection result = await self.send_command( - HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) + HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) # type: ignore[call-arg] ) try: @@ -2837,6 +2906,142 @@ async def request_remote_name(self, remote: Union[Address, Connection]) -> str: self.remove_listener('remote_name', handler) self.remove_listener('remote_name_failure', failure_handler) + # [LE only] + @experimental('Only for testing.') + async def setup_cig( + self, + cig_id: int, + cis_id: List[int], + sdu_interval: Tuple[int, int], + framing: int, + max_sdu: Tuple[int, int], + retransmission_number: int, + max_transport_latency: Tuple[int, int], + ) -> List[int]: + """Sends HCI_LE_Set_CIG_Parameters_Command. + + Args: + cig_id: CIG_ID. + cis_id: CID ID list. + sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental). + framing: Un-framing(0) or Framing(1). + max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental). + retransmission_number: retransmission_number. + max_transport_latency: Max transport latencies of + (Central->Peripheral, Peripheral->Cental). + + Returns: + List of created CIS handles corresponding to the same order of [cid_id]. + """ + num_cis = len(cis_id) + response = await self.send_command( + HCI_LE_Set_CIG_Parameters_Command( # type: ignore[call-arg] + cig_id=cig_id, + sdu_interval_c_to_p=sdu_interval[0], + sdu_interval_p_to_c=sdu_interval[1], + worst_case_sca=0x00, # 251-500 ppm + packing=0x00, # Sequential + framing=framing, + max_transport_latency_c_to_p=max_transport_latency[0], + max_transport_latency_p_to_c=max_transport_latency[1], + cis_id=cis_id, + max_sdu_c_to_p=[max_sdu[0]] * num_cis, + max_sdu_p_to_c=[max_sdu[1]] * num_cis, + phy_c_to_p=[HCI_LE_2M_PHY] * num_cis, + phy_p_to_c=[HCI_LE_2M_PHY] * num_cis, + rtn_c_to_p=[retransmission_number] * num_cis, + rtn_p_to_c=[retransmission_number] * num_cis, + ), + check_result=True, + ) + + return response.return_parameters.connection_handle[:] + + # [LE only] + @experimental('Only for testing.') + async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> None: + for cis_handle, acl_handle in cis_acl_pairs: + acl_connection = self.lookup_connection(acl_handle) + assert acl_connection + self.cis_links[cis_handle] = CisLink( + device=self, + acl_connection=acl_connection, + handle=cis_handle, + cis_id=0, + cig_id=0, + ) + result = await self.send_command( + HCI_LE_Create_CIS_Command( # type: ignore[call-arg] + cis_connection_handle=[p[0] for p in cis_acl_pairs], + acl_connection_handle=[p[1] for p in cis_acl_pairs], + ), + ) + if result.status != HCI_COMMAND_STATUS_PENDING: + logger.warning( + 'HCI_LE_Create_CIS_Command failed: ' + f'{HCI_Constant.error_name(result.status)}' + ) + raise HCI_StatusError(result) + + pending_cis_establishments: Dict[int, asyncio.Future[None]] = {} + for cis_handle, _ in cis_acl_pairs: + pending_cis_establishments[ + cis_handle + ] = asyncio.get_running_loop().create_future() + + with closing(EventWatcher()) as watcher: + + @watcher.on(self, 'cis_establishment') + def on_cis_establishment(cis_handle: int) -> None: + if pending_future := pending_cis_establishments.get(cis_handle, None): + pending_future.set_result(None) + + await asyncio.gather(*pending_cis_establishments.values()) + + # [LE only] + @experimental('Only for testing.') + async def answer_cis_request( + self, + handle: int, + accept: bool, + reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, + ) -> None: + if accept: + result = await self.send_command( + HCI_LE_Accept_CIS_Request_Command( # type: ignore[call-arg] + connection_handle=handle + ), + ) + if result.status != HCI_COMMAND_STATUS_PENDING: + logger.warning( + 'HCI_LE_Accept_CIS_Request_Command failed: ' + f'{HCI_Constant.error_name(result.status)}' + ) + raise HCI_StatusError(result) + + pending_cis_establishment = asyncio.get_running_loop().create_future() + + with closing(EventWatcher()) as watcher: + + @watcher.on(self, 'cis_establishment') + def on_cis_establishment(cis_handle: int) -> None: + if cis_handle == handle: + pending_cis_establishment.set_result(None) + + await pending_cis_establishment + else: + result = await self.send_command( + HCI_LE_Reject_CIS_Request_Command( # type: ignore[call-arg] + connection_handle=handle, reason=reason + ), + ) + if result.status != HCI_COMMAND_STATUS_PENDING: + logger.warning( + 'HCI_LE_Reject_CIS_Request_Command failed: ' + f'{HCI_Constant.error_name(result.status)}' + ) + raise HCI_StatusError(result) + @host_event_handler def on_flush(self): self.emit('flush') @@ -3041,31 +3246,32 @@ def on_connection_request(self, bd_addr, class_of_device, link_type): ) @host_event_handler - @with_connection_from_handle - def on_disconnection(self, connection, reason): - logger.debug( - f'*** Disconnection: [0x{connection.handle:04X}] ' - f'{connection.peer_address} as {connection.role_name}, reason={reason}' - ) - connection.emit('disconnection', reason) - - # Remove the connection from the map - del self.connections[connection.handle] - - # Cleanup subsystems that maintain per-connection state - self.gatt_server.on_disconnection(connection) - - # Restart advertising if auto-restart is enabled - if self.auto_restart_advertising: - logger.debug('restarting advertising') - self.abort_on( - 'flush', - self.start_advertising( - advertising_type=self.advertising_type, - own_address_type=self.advertising_own_address_type, - auto_restart=True, - ), + def on_disconnection(self, connection_handle: int, reason: int) -> None: + if connection := self.connections.pop(connection_handle, None): + logger.debug( + f'*** Disconnection: [0x{connection.handle:04X}] ' + f'{connection.peer_address} as {connection.role_name}, reason={reason}' ) + connection.emit('disconnection', reason) + + # Cleanup subsystems that maintain per-connection state + self.gatt_server.on_disconnection(connection) + + # Restart advertising if auto-restart is enabled + if self.auto_restart_advertising: + logger.debug('restarting advertising') + self.abort_on( + 'flush', + self.start_advertising( + advertising_type=self.advertising_type, # type: ignore[arg-type] + own_address_type=self.advertising_own_address_type, # type: ignore[arg-type] + auto_restart=True, + ), + ) + elif sco_link := self.sco_links.pop(connection_handle, None): + sco_link.emit('disconnection', reason) + elif cis_link := self.cis_links.pop(connection_handle, None): + cis_link.emit('disconnection', reason) @host_event_handler @with_connection_from_handle @@ -3343,6 +3549,96 @@ def on_remote_name_failure(self, connection: Connection, address, error): connection.emit('remote_name_failure', error) self.emit('remote_name_failure', address, error) + # [Classic only] + @host_event_handler + @with_connection_from_address + def on_sco_connection( + self, acl_connection: Connection, sco_handle: int, link_type: int + ) -> None: + logger.debug( + f'*** SCO connected: {acl_connection.peer_address}, ' + f'sco_handle=[0x{sco_handle:04X}], ' + f'link_type=[0x{link_type:02X}] ***' + ) + self.sco_links[sco_handle] = ScoLink( + device=self, + acl_connection=acl_connection, + handle=sco_handle, + link_type=link_type, + ) + + # [Classic only] + @host_event_handler + @with_connection_from_address + def on_sco_connection_failure( + self, acl_connection: Connection, status: int + ) -> None: + logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***') + + # [Classic only] + @host_event_handler + def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None: + if sco_link := self.sco_links.get(sco_handle, None): + sco_link.emit('pdu', packet) + + # [LE only] + @host_event_handler + @with_connection_from_handle + def on_cis_request( + self, + acl_connection: Connection, + cis_handle: int, + cig_id: int, + cis_id: int, + ) -> None: + logger.debug( + f'*** CIS Request ' + f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, ' + f'cis_handle=[0x{cis_handle:04X}], ' + f'cig_id=[0x{cig_id:02X}], ' + f'cis_id=[0x{cis_id:02X}] ***' + ) + # LE_CIS_Established event doesn't provide info, so we must store them here. + self.cis_links[cis_handle] = CisLink( + device=self, + acl_connection=acl_connection, + handle=cis_handle, + cig_id=cig_id, + cis_id=cis_id, + ) + self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id) + + # [LE only] + @host_event_handler + def on_cis_establishment(self, cis_handle: int) -> None: + cis_link = self.cis_links[cis_handle] + cis_link.state = CisLink.State.ESTABLISHED + + logger.debug( + f'*** CIS Established ' + f'{cis_link.acl_connection.peer_address}, ' + f'cis_handle=[0x{cis_handle:04X}], ' + f'cig_id=[0x{cis_link.cig_id:02X}], ' + f'cis_id=[0x{cis_link.cis_id:02X}] ***' + ) + + cis_link.emit('establishment') + self.emit('cis_establishment', cis_handle) + + # [LE only] + @host_event_handler + def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: + logger.debug(f'*** CIS Established Failure: cis=[0x{cis_handle:04X}] ***') + if cis_link := self.cis_links.pop(cis_handle, None): + cis_link.emit('establishment_failure') + self.emit('cis_establishment_failure', cis_handle, status) + + # [LE only] + @host_event_handler + def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None: + if cis_link := self.cis_links.get(handle, None): + cis_link.emit('pdu', packet) + @host_event_handler @with_connection_from_handle def on_connection_encryption_change(self, connection, encryption): diff --git a/bumble/hci.py b/bumble/hci.py index 8897624ea..3bf6e88bc 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -4451,7 +4451,10 @@ class HCI_LE_Accept_CIS_Request_Command(HCI_Command): # ----------------------------------------------------------------------------- @HCI_Command.command( - fields=[('connection_handle', 2)], + fields=[ + ('connection_handle', 2), + ('reason', {'size': 1, 'mapper': HCI_Constant.error_name}), + ], ) class HCI_LE_Reject_CIS_Request_Command(HCI_Command): ''' @@ -4459,6 +4462,7 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command): ''' connection_handle: int + reason: int # ----------------------------------------------------------------------------- diff --git a/bumble/host.py b/bumble/host.py index a649eb6d5..b06ceba48 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -32,8 +32,8 @@ Address, HCI_ACL_DATA_PACKET, HCI_COMMAND_PACKET, - HCI_COMMAND_COMPLETE_EVENT, HCI_EVENT_PACKET, + HCI_ISO_DATA_PACKET, HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND, HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, @@ -52,6 +52,7 @@ HCI_Constant, HCI_Error, HCI_Event, + HCI_IsoDataPacket, HCI_LE_Long_Term_Key_Request_Negative_Reply_Command, HCI_LE_Long_Term_Key_Request_Reply_Command, HCI_LE_Read_Buffer_Size_Command, @@ -75,7 +76,6 @@ BT_LE_TRANSPORT, ConnectionPHY, ConnectionParameters, - InvalidStateError, ) from .utils import AbortableEventEmitter from .transport.common import TransportLostError @@ -243,7 +243,7 @@ async def reset(self, driver_factory=drivers.get_driver_for_host): # understand le_event_mask = bytes.fromhex('1F00000000000000') else: - le_event_mask = bytes.fromhex('FFFFF00000000000') + le_event_mask = bytes.fromhex('FFFFFFFF00000000') await self.send_command( HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask) @@ -495,6 +495,8 @@ def on_hci_packet(self, packet: HCI_Packet) -> None: self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet)) elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET: self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet)) + elif packet.hci_packet_type == HCI_ISO_DATA_PACKET: + self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet)) else: logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') @@ -515,6 +517,10 @@ def on_hci_sco_data_packet(self, packet: HCI_SynchronousDataPacket) -> None: # Experimental self.emit('sco_packet', packet.connection_handle, packet) + def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None: + # Experimental + self.emit('iso_packet', packet.connection_handle, packet) + def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: self.emit('l2cap_pdu', connection.handle, cid, pdu) @@ -715,6 +721,24 @@ def on_hci_le_advertising_report_event(self, event): def on_hci_le_extended_advertising_report_event(self, event): self.on_hci_le_advertising_report_event(event) + def on_hci_le_cis_request_event(self, event): + self.emit( + 'cis_request', + event.acl_connection_handle, + event.cis_connection_handle, + event.cig_id, + event.cis_id, + ) + + def on_hci_le_cis_established_event(self, event): + # The remaining parameters are unused for now. + if event.status == HCI_SUCCESS: + self.emit('cis_establishment', event.connection_handle) + else: + self.emit( + 'cis_establishment_failure', event.connection_handle, event.status + ) + def on_hci_le_remote_connection_parameter_request_event(self, event): if event.connection_handle not in self.connections: logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle') diff --git a/bumble/transport/common.py b/bumble/transport/common.py index 2786a75e9..53e52235d 100644 --- a/bumble/transport/common.py +++ b/bumble/transport/common.py @@ -150,7 +150,7 @@ def feed_data(self, data: bytes) -> None: try: self.sink.on_packet(bytes(self.packet)) except Exception as error: - logger.warning( + logger.exception( color(f'!!! Exception in on_packet: {error}', 'red') ) self.reset() diff --git a/examples/leaudio.json b/examples/leaudio.json new file mode 100644 index 000000000..4b6edfcee --- /dev/null +++ b/examples/leaudio.json @@ -0,0 +1,5 @@ +{ + "name": "Bumble-LEA", + "keystore": "JsonKeyStore", + "advertising_interval": 100 +} diff --git a/examples/run_cig_setup.py b/examples/run_cig_setup.py new file mode 100644 index 000000000..25ddae9c2 --- /dev/null +++ b/examples/run_cig_setup.py @@ -0,0 +1,102 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import logging +import sys +import os +from bumble.device import ( + HCI_LE_Set_Extended_Advertising_Parameters_Command, + Device, + Connection, +) +from bumble.hci import OwnAddressType + +from bumble.transport import open_transport_or_link + + +# ----------------------------------------------------------------------------- +async def main() -> None: + if len(sys.argv) < 3: + print( + 'Usage: run_cig_setup.py ' + ' ' + ) + print( + 'example: run_cig_setup.py device1.json' + 'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402' + ) + return + + print('<<< connecting to HCI...') + hci_transports = await asyncio.gather( + open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3]) + ) + print('<<< connected') + + devices = [ + Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) + for hci_transport in hci_transports + ] + + devices[0].cis_enabled = True + devices[1].cis_enabled = True + + await asyncio.gather(*[device.power_on() for device in devices]) + await devices[0].start_extended_advertising( + advertising_properties=( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING + ), + own_address_type=OwnAddressType.PUBLIC, + ) + + connection = await devices[1].connect( + devices[0].public_address, own_address_type=OwnAddressType.PUBLIC + ) + + cid_ids = [2, 3] + cis_handles = await devices[1].setup_cig( + cig_id=1, + cis_id=cid_ids, + sdu_interval=(10000, 0), + framing=0, + max_sdu=(120, 0), + retransmission_number=13, + max_transport_latency=(100, 0), + ) + + def on_cis_request( + connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int + ): + connection.abort_on( + 'disconnection', devices[0].answer_cis_request(cis_handle, accept=True) + ) + + devices[0].on('cis_request', on_cis_request) + + await devices[1].create_cis([(cis, connection.handle) for cis in cis_handles]) + + await asyncio.gather( + *[hci_transport.source.terminated for hci_transport in hci_transports] + ) + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main())